diff --git a/src/Pure/Concurrent/consumer_thread.scala b/src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala +++ b/src/Pure/Concurrent/consumer_thread.scala @@ -1,124 +1,124 @@ /* Title: Pure/Concurrent/consumer_thread.scala Author: Makarius Consumer thread with unbounded queueing of requests, and optional acknowledgment. */ package isabelle import scala.annotation.tailrec object Consumer_Thread { def fork_bulk[A](name: String = "", daemon: Boolean = false)( bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), finish: () => Unit = () => ()): Consumer_Thread[A] = new Consumer_Thread[A](name, daemon, bulk, consume, finish) def fork[A](name: String = "", daemon: Boolean = false)( consume: A => Boolean, finish: () => Unit = () => ()): Consumer_Thread[A] = { def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = { assert(args.length == 1) Exn.capture { consume(args.head) } match { case Exn.Res(continue) => (List(Exn.Res(())), continue) case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) } } fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish) } } final class Consumer_Thread[A] private( name: String, daemon: Boolean, bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), finish: () => Unit) { /* thread */ private var active = true private val mailbox = Mailbox[Option[Request]] private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) } - def is_active: Boolean = active && thread.isAlive + def is_active(): Boolean = active && thread.isAlive def check_thread: Boolean = Thread.currentThread == thread private def failure(exn: Throwable): Unit = Output.error_message( "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } /* requests */ private class Request(val arg: A, acknowledge: Boolean = false) { val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = if (acknowledge) Some(Synchronized(None)) else None def await(): Unit = { for (a <- ack) { Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })) } } } private def request(req: Request): Unit = { synchronized { - if (is_active) mailbox.send(Some(req)) + if (is_active()) mailbox.send(Some(req)) else error("Consumer thread not active: " + quote(thread.getName)) } req.await() } @tailrec private def main_loop(msgs: List[Option[Request]]): Unit = msgs match { case Nil => main_loop(mailbox.receive()) case None :: _ => robust_finish() case _ => val reqs = proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) .getOrElse(msgs.take(1)) .map(_.get) val (results, continue) = consume(reqs.map(_.arg)) for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) } { (req.ack, res) match { case (Some(a), _) => a.change(_ => Some(res)) case (None, Exn.Res(_)) => case (None, Exn.Exn(exn)) => failure(exn) } } if (continue) main_loop(msgs.drop(reqs.length)) else robust_finish() } /* main methods */ - assert(is_active) + assert(is_active()) def send(arg: A): Unit = request(new Request(arg)) def send_wait(arg: A): Unit = request(new Request(arg, acknowledge = true)) def shutdown(): Unit = { - synchronized { if (is_active) { active = false; mailbox.send(None) } } + synchronized { if (is_active()) { active = false; mailbox.send(None) } } thread.join() } } diff --git a/src/Pure/PIDE/prover.scala b/src/Pure/PIDE/prover.scala --- a/src/Pure/PIDE/prover.scala +++ b/src/Pure/PIDE/prover.scala @@ -1,323 +1,323 @@ /* Title: Pure/PIDE/prover.scala Author: Makarius Options: :folding=explicit: Prover process wrapping. */ package isabelle import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException} object Prover { /* messages */ sealed abstract class Message type Receiver = Message => Unit class Input(val name: String, val args: List[String]) extends Message { override def toString: String = XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))), args.flatMap(s => List(XML.newline, XML.elem(Markup.PROVER_ARG, YXML.parse_body(s))))).toString } class Output(val message: XML.Elem) extends Message { def kind: String = message.markup.name def properties: Properties.T = message.markup.properties def body: XML.Body = message.body def is_init: Boolean = kind == Markup.INIT def is_exit: Boolean = kind == Markup.EXIT def is_stdout: Boolean = kind == Markup.STDOUT def is_stderr: Boolean = kind == Markup.STDERR def is_system: Boolean = kind == Markup.SYSTEM def is_status: Boolean = kind == Markup.STATUS def is_report: Boolean = kind == Markup.REPORT def is_syslog: Boolean = is_init || is_exit || is_system || is_stderr override def toString: String = { val res = if (is_status || is_report) message.body.map(_.toString).mkString else Pretty.string_of(message.body, metric = Symbol.Metric) if (properties.isEmpty) kind + " [[" + res + "]]" else kind + " " + (properties.map(Properties.Eq.apply)).mkString("{", ",", "}") + " [[" + res + "]]" } } class Malformed(msg: String) extends Exn.User_Error("Malformed prover message: " + msg) def bad_header(print: String): Nothing = throw new Malformed("bad message header\n" + print) def bad_chunks(): Nothing = throw new Malformed("bad message chunks") def the_chunk(chunks: List[Bytes], print: => String): Bytes = chunks match { case List(chunk) => chunk case _ => throw new Malformed("single chunk expected: " + print) } class Protocol_Output(props: Properties.T, val chunks: List[Bytes]) extends Output(XML.Elem(Markup(Markup.PROTOCOL, props), Nil)) { def chunk: Bytes = the_chunk(chunks, toString) lazy val text: String = chunk.text } } class Prover( receiver: Prover.Receiver, cache: XML.Cache, channel: System_Channel, process: Bash.Process) extends Protocol { /** receiver output **/ private def system_output(text: String): Unit = { receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))) } private def protocol_output(props: Properties.T, chunks: List[Bytes]): Unit = { receiver(new Prover.Protocol_Output(props, chunks)) } private def output(kind: String, props: Properties.T, body: XML.Body): Unit = { val main = XML.Elem(Markup(kind, props), Protocol_Message.clean_reports(body)) val reports = Protocol_Message.reports(props, body) for (msg <- main :: reports) receiver(new Prover.Output(cache.elem(msg))) } private def exit_message(result: Process_Result): Unit = { output(Markup.EXIT, Markup.Process_Result(result), List(XML.Text(result.print_return_code))) } /** process manager **/ private val process_result: Future[Process_Result] = Future.thread("process_result") { val rc = process.join() val timing = process.get_timing Process_Result(rc, timing = timing) } private def terminate_process(): Unit = { try { process.terminate() } catch { case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage) } } private val process_manager = Isabelle_Thread.fork(name = "process_manager") { val stdout = physical_output(false) val (startup_failed, startup_errors) = { var finished: Option[Boolean] = None val result = new StringBuilder(100) while (finished.isEmpty && (process.stderr.ready || !process_result.is_finished)) { while (finished.isEmpty && process.stderr.ready) { try { val c = process.stderr.read if (c == 2) finished = Some(true) else result += c.toChar } catch { case _: IOException => finished = Some(false) } } Time.seconds(0.05).sleep() } (finished.isEmpty || !finished.get, result.toString.trim) } if (startup_errors != "") system_output(startup_errors) if (startup_failed) { terminate_process() process_result.join stdout.join exit_message(Process_Result(127)) } else { val (command_stream, message_stream) = channel.rendezvous() command_input_init(command_stream) val stderr = physical_output(true) val message = message_output(message_stream) val result = process_result.join system_output("process terminated") command_input_close() for (thread <- List(stdout, stderr, message)) thread.join() system_output("process_manager terminated") exit_message(result) } channel.shutdown() } /* management methods */ def join(): Unit = process_manager.join() def terminate(): Unit = { system_output("Terminating prover process") command_input_close() var count = 10 while (!process_result.is_finished && count > 0) { Time.seconds(0.1).sleep() count -= 1 } if (!process_result.is_finished) terminate_process() } /** process streams **/ /* command input */ private var command_input: Option[Consumer_Thread[List[Bytes]]] = None private def command_input_close(): Unit = command_input.foreach(_.shutdown()) private def command_input_init(raw_stream: OutputStream): Unit = { val name = "command_input" val stream = new BufferedOutputStream(raw_stream) command_input = Some( Consumer_Thread.fork(name)( consume = { case chunks => try { Bytes(chunks.map(_.length).mkString("", ",", "\n")).write_stream(stream) chunks.foreach(_.write_stream(stream)) stream.flush true } catch { case e: IOException => system_output(name + ": " + e.getMessage); false } }, finish = { case () => stream.close(); system_output(name + " terminated") } ) ) } /* physical output */ private def physical_output(err: Boolean): Thread = { val (name, reader, markup) = if (err) ("standard_error", process.stderr, Markup.STDERR) else ("standard_output", process.stdout, Markup.STDOUT) Isabelle_Thread.fork(name = name) { try { var result = new StringBuilder(100) var finished = false while (!finished) { //{{{ var c = -1 var done = false while (!done && (result.isEmpty || reader.ready)) { c = reader.read if (c >= 0) result.append(c.asInstanceOf[Char]) else done = true } if (result.nonEmpty) { output(markup, Nil, List(XML.Text(Symbol.decode(result.toString)))) result.clear() } else { reader.close() finished = true } //}}} } } catch { case e: IOException => system_output(name + ": " + e.getMessage) } system_output(name + " terminated") } } /* message output */ private def message_output(stream: InputStream): Thread = { def decode_chunk(chunk: Bytes): XML.Body = Symbol.decode_yxml_failsafe(chunk.text, cache = cache) val thread_name = "message_output" Isabelle_Thread.fork(name = thread_name) { try { var finished = false while (!finished) { Byte_Message.read_message(stream) match { case None => finished = true case Some(header :: chunks) => decode_chunk(header) match { case List(XML.Elem(Markup(kind, props), Nil)) => if (kind == Markup.PROTOCOL) protocol_output(props, chunks) else output(kind, props, decode_chunk(Prover.the_chunk(chunks, kind))) case _ => Prover.bad_header(header.toString) } case Some(_) => Prover.bad_chunks() } } } catch { case e: IOException => system_output("Cannot read message:\n" + e.getMessage) case e: Prover.Malformed => system_output(e.getMessage) } stream.close() system_output(thread_name + " terminated") } } /** protocol commands **/ var trace: Boolean = false def protocol_command_raw(name: String, args: List[Bytes]): Unit = command_input match { - case Some(thread) if thread.is_active => + case Some(thread) if thread.is_active() => if (trace) { val payload = args.foldLeft(0) { case (n, b) => n + b.length } Output.writeln( "protocol_command " + name + ", args = " + args.length + ", payload = " + payload) } thread.send(Bytes(name) :: args) case _ => error("Inactive prover input thread for command " + quote(name)) } def protocol_command_args(name: String, args: List[String]): Unit = { receiver(new Prover.Input(name, args)) protocol_command_raw(name, args.map(Bytes(_))) } def protocol_command(name: String, args: String*): Unit = protocol_command_args(name, args.toList) } diff --git a/src/Pure/PIDE/session.scala b/src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala +++ b/src/Pure/PIDE/session.scala @@ -1,784 +1,784 @@ /* Title: Pure/PIDE/session.scala Author: Makarius Options: :folding=explicit: PIDE editor session, potentially with running prover process. */ package isabelle import scala.collection.immutable.Queue import scala.collection.mutable import scala.annotation.tailrec object Session { /* outlets */ object Consumer { def apply[A](name: String)(consume: A => Unit): Consumer[A] = new Consumer[A](name, consume) } final class Consumer[-A] private(val name: String, val consume: A => Unit) class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { private val consumers = Synchronized[List[Consumer[A]]](Nil) def += (c: Consumer[A]): Unit = consumers.change(Library.update(c)) def -= (c: Consumer[A]): Unit = consumers.change(Library.remove(c)) def post(a: A): Unit = { for (c <- consumers.value.iterator) { dispatcher.send(() => try { c.consume(a) } catch { case exn: Throwable => Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) }) } } } /* change */ sealed case class Change( previous: Document.Version, syntax_changed: List[Document.Node.Name], deps_changed: Boolean, doc_edits: List[Document.Edit_Command], consolidate: List[Document.Node.Name], version: Document.Version) case object Change_Flush /* events */ //{{{ case class Command_Timing(props: Properties.T) case class Theory_Timing(props: Properties.T) case class Runtime_Statistics(props: Properties.T) case class Task_Statistics(props: Properties.T) case class Global_Options(options: Options) case object Caret_Focus case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String) case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])]) case class Commands_Changed( assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command]) sealed abstract class Phase { def print: String = this match { case Terminated(result) => if (result.ok) "finished" else "failed" case _ => Word.lowercase(this.toString) } } case object Inactive extends Phase // stable case object Startup extends Phase // transient case object Ready extends Phase // metastable case object Shutdown extends Phase // transient case class Terminated(result: Process_Result) extends Phase // stable //}}} /* syslog */ private[Session] class Syslog(limit: Int) { private var queue = Queue.empty[XML.Elem] private var length = 0 def += (msg: XML.Elem): Unit = synchronized { queue = queue.enqueue(msg) length += 1 if (length > limit) queue = queue.dequeue._2 } def content: String = synchronized { cat_lines(queue.iterator.map(XML.content)) + (if (length > limit) "\n(A total of " + length + " messages...)" else "") } } /* protocol handlers */ type Protocol_Function = Prover.Protocol_Output => Boolean abstract class Protocol_Handler extends Isabelle_System.Service { def init(session: Session): Unit = {} def exit(): Unit = {} def functions: List[(String, Protocol_Function)] = Nil def prover_options(options: Options): Options = options } } class Session(_session_options: => Options, val resources: Resources) extends Document.Session { session => val cache: XML.Cache = XML.Cache.make() def build_blobs_info(name: Document.Node.Name): Command.Blobs_Info = Command.Blobs_Info.none /* global flags */ @volatile var timing: Boolean = false @volatile var verbose: Boolean = false /* dynamic session options */ def session_options: Options = _session_options def output_delay: Time = session_options.seconds("editor_output_delay") def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay") def prune_delay: Time = session_options.seconds("editor_prune_delay") def prune_size: Int = session_options.int("editor_prune_size") def syslog_limit: Int = session_options.int("editor_syslog_limit") def reparse_limit: Int = session_options.int("editor_reparse_limit") /* dispatcher */ private val dispatcher = Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } def assert_dispatcher[A](body: => A): A = { assert(dispatcher.check_thread) body } def require_dispatcher[A](body: => A): A = { require(dispatcher.check_thread, "not on dispatcher thread") body } def send_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send(() => body) } def send_wait_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send_wait(() => body) } /* outlets */ val finished_theories = new Session.Outlet[Document.Snapshot](dispatcher) val command_timings = new Session.Outlet[Session.Command_Timing](dispatcher) val theory_timings = new Session.Outlet[Session.Theory_Timing](dispatcher) val runtime_statistics = new Session.Outlet[Session.Runtime_Statistics](dispatcher) val task_statistics = new Session.Outlet[Session.Task_Statistics](dispatcher) val global_options = new Session.Outlet[Session.Global_Options](dispatcher) val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) val phase_changed = new Session.Outlet[Session.Phase](dispatcher) val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher) val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck! /** main protocol manager **/ /* internal messages */ private case class Start(start_prover: Prover.Receiver => Prover) private case object Stop private case class Get_State(promise: Promise[Document.State]) private case class Cancel_Exec(exec_id: Document_ID.Exec) private case class Protocol_Command_Raw(name: String, args: List[Bytes]) private case class Protocol_Command_Args(name: String, args: List[String]) private case class Update_Options(options: Options) private case object Consolidate_Execution private case object Prune_History /* phase */ private def post_phase(new_phase: Session.Phase): Session.Phase = { phase_changed.post(new_phase) new_phase } private val _phase = Synchronized[Session.Phase](Session.Inactive) private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase)) def phase: Session.Phase = _phase.value def is_ready: Boolean = phase == Session.Ready /* syslog */ private val syslog = new Session.Syslog(syslog_limit) def syslog_content(): String = syslog.content /* pipelined change parsing */ private case class Text_Edits( previous: Future[Document.Version], doc_blobs: Document.Blobs, text_edits: List[Document.Edit_Text], consolidate: List[Document.Node.Name], version_result: Promise[Document.Version]) private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) { case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) => val prev = previous.get_finished val change = Timing.timeit("parse_change", timing) { resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate) } version_result.fulfill(change.version) manager.send(change) true } /* buffered changes */ private object change_buffer { private var assignment: Boolean = false private var nodes: Set[Document.Node.Name] = Set.empty private var commands: Set[Command] = Set.empty def flush(): Unit = synchronized { if (assignment || nodes.nonEmpty || commands.nonEmpty) commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) if (nodes.nonEmpty) consolidation.update(nodes) assignment = false nodes = Set.empty commands = Set.empty } private val delay_flush = Delay.first(output_delay) { flush() } def invoke(assign: Boolean, edited_nodes: List[Document.Node.Name], cmds: List[Command]): Unit = synchronized { assignment |= assign for (node <- edited_nodes) { nodes += node } for (command <- cmds) { nodes += command.node_name command.blobs_names.foreach(nodes += _) commands += command } delay_flush.invoke() } def shutdown(): Unit = { delay_flush.revoke() flush() } } /* postponed changes */ private object postponed_changes { private var postponed: List[Session.Change] = Nil def store(change: Session.Change): Unit = synchronized { postponed ::= change } def flush(state: Document.State): List[Session.Change] = synchronized { val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) postponed = unassigned assigned.reverse } } /* node consolidation */ private object consolidation { private val delay = Delay.first(consolidate_delay) { manager.send(Consolidate_Execution) } private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty) private val state = Synchronized(init_state) def exit(): Unit = { delay.revoke() state.change(_ => None) } def update(new_nodes: Set[Document.Node.Name] = Set.empty): Unit = { val active = state.change_result(st => (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes))) if (active) delay.invoke() } def flush(): Set[Document.Node.Name] = state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None)) } /* prover process */ private object prover { private val variable = Synchronized[Option[Prover]](None) def defined: Boolean = variable.value.isDefined def get: Prover = variable.value.get def set(p: Prover): Unit = variable.change(_ => Some(p)) def reset: Unit = variable.change(_ => None) def await_reset(): Unit = variable.guarded_access({ case None => Some((), None) case _ => None }) } /* file formats and protocol handlers */ private lazy val file_formats: File_Format.Session = File_Format.registry.start_session(session) private val protocol_handlers = Protocol_Handlers.init(session) def get_protocol_handler[C <: Session.Protocol_Handler](c: Class[C]): Option[C] = protocol_handlers.get(c.getName) match { case Some(h) if Library.is_subclass(h.getClass, c) => Some(h.asInstanceOf[C]) case _ => None } def init_protocol_handler(handler: Session.Protocol_Handler): Unit = protocol_handlers.init(handler) def prover_options(options: Options): Options = protocol_handlers.prover_options(file_formats.prover_options(options)) /* debugger */ private val debugger_handler = new Debugger.Handler(this) init_protocol_handler(debugger_handler) def debugger: Debugger = debugger_handler.debugger /* manager thread */ private val delay_prune = Delay.first(prune_delay) { manager.send(Prune_History) } private val manager: Consumer_Thread[Any] = { /* global state */ val global_state = Synchronized(Document.State.init) /* raw edits */ def handle_raw_edits( doc_blobs: Document.Blobs = Document.Blobs.empty, edits: List[Document.Edit_Text] = Nil, consolidate: List[Document.Node.Name] = Nil): Unit = //{{{ { require(prover.defined, "prover process not defined (handle_raw_edits)") if (edits.nonEmpty) prover.get.discontinue_execution() val previous = global_state.value.history.tip.version val version = Future.promise[Document.Version] global_state.change(_.continue_history(previous, edits, version)) raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version)) } //}}} /* resulting changes */ def handle_change(change: Session.Change): Unit = //{{{ { require(prover.defined, "prover process not defined (handle_change)") // define commands { val id_commands = new mutable.ListBuffer[Command] def id_command(command: Command): Unit = { for { (name, digest) <- command.blobs_defined if !global_state.value.defined_blob(digest) } { change.version.nodes(name).get_blob match { case Some(blob) => global_state.change(_.define_blob(digest)) prover.get.define_blob(digest, blob.bytes) case None => Output.error_message("Missing blob " + quote(name.toString)) } } if (!global_state.value.defined_command(command.id)) { global_state.change(_.define_command(command)) id_commands += command } } for { (_, edit) <- change.doc_edits } { edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) } if (id_commands.nonEmpty) { prover.get.define_commands_bulk(resources, id_commands.toList) } } val assignment = global_state.value.the_assignment(change.previous).check_finished global_state.change(_.define_version(change.version, assignment)) prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate) resources.commit(change) } //}}} /* prover output */ def handle_output(output: Prover.Output): Unit = //{{{ { def bad_output(): Unit = { if (verbose) Output.warning("Ignoring bad prover output: " + output.message.toString) } def change_command(f: Document.State => (Command.State, Document.State)): Unit = { try { val st = global_state.change_result(f) if (!st.command.span.is_theory) { change_buffer.invoke(false, Nil, List(st.command)) } } catch { case _: Document.State.Fail => bad_output() } } output match { case msg: Prover.Protocol_Output => val handled = protocol_handlers.invoke(msg) if (!handled) { msg.properties match { case Protocol.Command_Timing(props, state_id, timing) if prover.defined => command_timings.post(Session.Command_Timing(props)) val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) change_command(_.accumulate(state_id, cache.elem(message), cache)) case Markup.Theory_Timing(props) => theory_timings.post(Session.Theory_Timing(props)) case Markup.Task_Statistics(props) => task_statistics.post(Session.Task_Statistics(props)) case Protocol.Export(args) if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => val id = Value.Long.unapply(args.id.get).get val export = Export.make_entry("", args, msg.chunk, cache) change_command(_.add_export(id, (args.serial, export))) case Protocol.Loading_Theory(node_name, id) => val blobs_info = build_blobs_info(node_name) try { global_state.change(_.begin_theory(node_name, id, msg.text, blobs_info)) } catch { case _: Document.State.Fail => bad_output() } case List(Markup.Commands_Accepted.PROPERTY) => msg.text match { case Protocol.Commands_Accepted(ids) => ids.foreach(id => change_command(_.accumulate(id, Protocol.Commands_Accepted.message, cache))) case _ => bad_output() } case List(Markup.Assign_Update.PROPERTY) => msg.text match { case Protocol.Assign_Update(id, edited, update) => try { val (edited_nodes, cmds) = global_state.change_result(_.assign(id, edited, update)) change_buffer.invoke(true, edited_nodes, cmds) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } delay_prune.invoke() case List(Markup.Removed_Versions.PROPERTY) => msg.text match { case Protocol.Removed(removed) => try { global_state.change(_.removed_versions(removed)) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } case _ => bad_output() } } case _ => output.properties match { case Position.Id(state_id) => change_command(_.accumulate(state_id, output.message, cache)) case _ if output.is_init => val init_ok = try { Isabelle_System.make_services(classOf[Session.Protocol_Handler]) .foreach(init_protocol_handler) true } catch { case exn: Throwable => prover.get.protocol_command("Prover.stop", "1", Exn.message(exn)) false } if (init_ok) { prover.get.options(prover_options(session_options)) prover.get.init_session(resources) phase = Session.Ready debugger.ready() } case Markup.Process_Result(result) if output.is_exit => if (prover.defined) protocol_handlers.exit() for (id <- global_state.value.theories.keys) { val snapshot = global_state.change_result(_.end_theory(id)) finished_theories.post(snapshot) } file_formats.stop_session phase = Session.Terminated(result) prover.reset case _ => raw_output_messages.post(output) } } } //}}} /* main thread */ Consumer_Thread.fork[Any]("Session.manager", daemon = true) { case arg: Any => //{{{ arg match { case output: Prover.Output => if (output.is_syslog) { syslog += output.message syslog_messages.post(output) } if (output.is_stdout || output.is_stderr) raw_output_messages.post(output) else handle_output(output) all_messages.post(output) case input: Prover.Input => all_messages.post(input) case Start(start_prover) if !prover.defined => prover.set(start_prover(manager.send(_))) case Stop => consolidation.exit() delay_prune.revoke() if (prover.defined) { global_state.change(_ => Document.State.init) prover.get.terminate() } case Get_State(promise) => promise.fulfill(global_state.value) case Consolidate_Execution => if (prover.defined) { val state = global_state.value state.stable_tip_version match { case None => consolidation.update() case Some(version) => val consolidate = consolidation.flush().iterator.filter(name => !resources.session_base.loaded_theory(name) && !state.node_consolidated(version, name) && state.node_maybe_consolidated(version, name)).toList if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate) } } case Prune_History => if (prover.defined) { val old_versions = global_state.change_result(_.remove_versions(prune_size)) if (old_versions.nonEmpty) prover.get.remove_versions(old_versions) } case Update_Options(options) => if (prover.defined && is_ready) { prover.get.options(prover_options(options)) handle_raw_edits() } global_options.post(Session.Global_Options(options)) case Cancel_Exec(exec_id) if prover.defined => prover.get.cancel_exec(exec_id) case Session.Raw_Edits(doc_blobs, edits) if prover.defined => handle_raw_edits(doc_blobs = doc_blobs, edits = edits) case Session.Dialog_Result(id, serial, result) if prover.defined => prover.get.dialog_result(serial, result) handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) case Protocol_Command_Raw(name, args) if prover.defined => prover.get.protocol_command_raw(name, args) case Protocol_Command_Args(name, args) if prover.defined => prover.get.protocol_command_args(name, args) case change: Session.Change if prover.defined => val state = global_state.value if (!state.removing_versions && state.is_assigned(change.previous)) handle_change(change) else postponed_changes.store(change) case Session.Change_Flush if prover.defined => val state = global_state.value if (!state.removing_versions) postponed_changes.flush(state).foreach(handle_change) case bad => if (verbose) Output.warning("Ignoring bad message: " + bad.toString) } true //}}} } } /* main operations */ def get_state(): Document.State = { - if (manager.is_active) { + if (manager.is_active()) { val promise = Future.promise[Document.State] manager.send_wait(Get_State(promise)) promise.join } else Document.State.init } def snapshot(name: Document.Node.Name = Document.Node.Name.empty, pending_edits: List[Text.Edit] = Nil): Document.Snapshot = get_state().snapshot(name, pending_edits) def recent_syntax(name: Document.Node.Name): Outer_Syntax = get_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse resources.session_base.overall_syntax @tailrec final def await_stable_snapshot(): Document.Snapshot = { val snapshot = this.snapshot() if (snapshot.is_outdated) { output_delay.sleep() await_stable_snapshot() } else snapshot } def start(start_prover: Prover.Receiver => Prover): Unit = { file_formats _phase.change( { case Session.Inactive => manager.send(Start(start_prover)) post_phase(Session.Startup) case phase => error("Cannot start prover in phase " + quote(phase.print)) }) } def stop(): Process_Result = { val was_ready = _phase.guarded_access( { case Session.Startup | Session.Shutdown => None case Session.Terminated(_) => Some((false, phase)) case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0))))) case Session.Ready => Some((true, post_phase(Session.Shutdown))) }) if (was_ready) manager.send(Stop) prover.await_reset() change_parser.shutdown() change_buffer.shutdown() manager.shutdown() dispatcher.shutdown() phase match { case Session.Terminated(result) => result case phase => error("Bad session phase after shutdown: " + quote(phase.print)) } } def protocol_command_raw(name: String, args: List[Bytes]): Unit = manager.send(Protocol_Command_Raw(name, args)) def protocol_command_args(name: String, args: List[String]): Unit = manager.send(Protocol_Command_Args(name, args)) def protocol_command(name: String, args: String*): Unit = protocol_command_args(name, args.toList) def cancel_exec(exec_id: Document_ID.Exec): Unit = manager.send(Cancel_Exec(exec_id)) def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]): Unit = if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) def update_options(options: Options): Unit = manager.send_wait(Update_Options(options)) def dialog_result(id: Document_ID.Generic, serial: Long, result: String): Unit = manager.send(Session.Dialog_Result(id, serial, result)) } diff --git a/src/Pure/Tools/simplifier_trace.scala b/src/Pure/Tools/simplifier_trace.scala --- a/src/Pure/Tools/simplifier_trace.scala +++ b/src/Pure/Tools/simplifier_trace.scala @@ -1,331 +1,331 @@ /* Title: Pure/Tools/simplifier_trace.scala Author: Lars Hupel Interactive Simplifier trace. */ package isabelle import scala.annotation.tailrec import scala.collection.immutable.SortedMap object Simplifier_Trace { /* trace items from the prover */ val TEXT = "text" val Text = new Properties.String(TEXT) val PARENT = "parent" val Parent = new Properties.Long(PARENT) val SUCCESS = "success" val Success = new Properties.Boolean(SUCCESS) val MEMORY = "memory" val Memory = new Properties.Boolean(MEMORY) object Item { case class Data( serial: Long, markup: String, text: String, parent: Long, props: Properties.T, content: XML.Body) { def memory: Boolean = Memory.unapply(props) getOrElse true } def unapply(tree: XML.Tree): Option[(String, Data)] = tree match { case XML.Elem(Markup(Markup.RESULT, Markup.Serial(serial)), List(XML.Elem(Markup(markup, props), content))) if markup.startsWith("simp_trace_") => // FIXME proper comparison of string constants (props, props) match { case (Text(text), Parent(parent)) => Some((markup, Data(serial, markup, text, parent, props, content))) case _ => None } case _ => None } } /* replies to the prover */ case class Answer private[Simplifier_Trace](val name: String, val string: String) object Answer { object step { val skip = Answer("skip", "Skip") val continue = Answer("continue", "Continue") val continue_trace = Answer("continue_trace", "Continue (with full trace)") val continue_passive = Answer("continue_passive", "Continue (without asking)") val continue_disable = Answer("continue_disable", "Continue (without any trace)") val all = List(continue, continue_trace, continue_passive, continue_disable, skip) } object hint_fail { val exit = Answer("exit", "Exit") val redo = Answer("redo", "Redo") val all = List(redo, exit) } } val all_answers: List[Answer] = Answer.step.all ::: Answer.hint_fail.all /* GUI interaction */ case object Event /* manager thread */ private case class Handle_Results( session: Session, id: Document_ID.Command, results: Command.Results, slot: Promise[Context]) private case class Generate_Trace(results: Command.Results, slot: Promise[Trace]) private case class Cancel(serial: Long) private object Clear_Memory case class Reply(session: Session, serial: Long, answer: Answer) case class Question(data: Item.Data, answers: List[Answer]) case class Context( last_serial: Long = 0L, questions: SortedMap[Long, Question] = SortedMap.empty) { def +(q: Question): Context = copy(questions = questions + ((q.data.serial, q))) def -(s: Long): Context = copy(questions = questions - s) def with_serial(s: Long): Context = copy(last_serial = Math.max(last_serial, s)) } case class Trace(entries: List[Item.Data]) case class Index(text: String, content: XML.Body) object Index { def of_data(data: Item.Data): Index = Index(data.text, data.content) } def handle_results(session: Session, id: Document_ID.Command, results: Command.Results): Context = { val slot = Future.promise[Context] the_manager(session).send(Handle_Results(session, id, results, slot)) slot.join } def generate_trace(session: Session, results: Command.Results): Trace = { val slot = Future.promise[Trace] the_manager(session).send(Generate_Trace(results, slot)) slot.join } def clear_memory(session: Session): Unit = the_manager(session).send(Clear_Memory) def send_reply(session: Session, serial: Long, answer: Answer): Unit = the_manager(session).send(Reply(session, serial, answer)) def make_manager: Consumer_Thread[Any] = { var contexts = Map.empty[Document_ID.Command, Context] var memory_children = Map.empty[Long, Set[Long]] var memory = Map.empty[Index, Answer] def find_question(serial: Long): Option[(Document_ID.Command, Question)] = contexts collectFirst { case (id, context) if context.questions contains serial => (id, context.questions(serial)) } def do_cancel(serial: Long, id: Document_ID.Command): Unit = { // To save memory, we could try to remove empty contexts at this point. // However, if a new serial gets attached to the same command_id after we deleted // its context, its last_serial counter will start at 0 again, and we'll think the // old serials are actually new contexts += (id -> (contexts(id) - serial)) } def do_reply(session: Session, serial: Long, answer: Answer): Unit = { session.protocol_command( "Simplifier_Trace.reply", Value.Long(serial), answer.name) } Consumer_Thread.fork[Any]("Simplifier_Trace.manager", daemon = true)( consume = (arg: Any) => { arg match { case Handle_Results(session, id, results, slot) => var new_context = contexts.getOrElse(id, Context()) var new_serial = new_context.last_serial for ((serial, result) <- results.iterator if serial > new_context.last_serial) { result match { case Item(markup, data) => memory_children += (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial)) markup match { case Markup.SIMP_TRACE_STEP => val index = Index.of_data(data) memory.get(index) match { case Some(answer) if data.memory => do_reply(session, serial, answer) case _ => new_context += Question(data, Answer.step.all) } case Markup.SIMP_TRACE_HINT => data.props match { case Success(false) => results.get(data.parent) match { case Some(Item(Markup.SIMP_TRACE_STEP, _)) => new_context += Question(data, Answer.hint_fail.all) case _ => // unknown, better send a default reply do_reply(session, data.serial, Answer.hint_fail.exit) } case _ => } case Markup.SIMP_TRACE_IGNORE => // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP' // entry, and that that 'STEP' entry is about to be replayed. Hence, we need // to selectively purge the replies which have been memorized, going down from // the parent to all leaves. @tailrec def purge(queue: Vector[Long]): Unit = queue match { case s +: rest => for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s)) memory -= Index.of_data(data) val children = memory_children.getOrElse(s, Set.empty) memory_children -= s purge(rest ++ children.toVector) case _ => } purge(Vector(data.parent)) case _ => } case _ => } new_serial = serial } new_context = new_context.with_serial(new_serial) contexts += (id -> new_context) slot.fulfill(new_context) case Generate_Trace(results, slot) => // Since there are potentially lots of trace messages, we do not cache them here again. // Instead, everytime the trace is being requested, we re-assemble it based on the // current results. val items = results.iterator.collect { case (_, Item(_, data)) => data }.toList slot.fulfill(Trace(items)) case Cancel(serial) => find_question(serial) match { case Some((id, _)) => do_cancel(serial, id) case None => } case Clear_Memory => memory_children = Map.empty memory = Map.empty case Reply(session, serial, answer) => find_question(serial) match { case Some((id, Question(data, _))) => if (data.markup == Markup.SIMP_TRACE_STEP && data.memory) { val index = Index.of_data(data) memory += (index -> answer) } do_cancel(serial, id) case None => Output.warning("send_reply: unknown serial " + serial) } do_reply(session, serial, answer) session.trace_events.post(Event) } true }, finish = () => contexts = Map.empty ) } private val managers = Synchronized(Map.empty[Session, Consumer_Thread[Any]]) def the_manager(session: Session): Consumer_Thread[Any] = managers.value.get(session) match { - case Some(thread) if thread.is_active => thread + case Some(thread) if thread.is_active() => thread case _ => error("Bad Simplifier_Trace.manager thread") } /* protocol handler */ class Handler extends Session.Protocol_Handler { private var the_session: Session = null override def init(session: Session): Unit = { try { the_manager(session) } catch { case ERROR(_) => managers.change(map => map + (session -> make_manager)) } the_session = session } override def exit(): Unit = { val session = the_session if (session != null) { val manager = the_manager(session) manager.send(Clear_Memory) manager.shutdown() managers.change(map => map - session) the_session = null } } private def cancel(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Markup.Simp_Trace_Cancel(serial) => the_manager(the_session).send(Cancel(serial)) true case _ => false } override val functions = List(Markup.SIMP_TRACE_CANCEL -> cancel) } }