diff --git a/src/Pure/PIDE/protocol.ML b/src/Pure/PIDE/protocol.ML --- a/src/Pure/PIDE/protocol.ML +++ b/src/Pure/PIDE/protocol.ML @@ -1,156 +1,169 @@ (* Title: Pure/PIDE/protocol.ML Author: Makarius Protocol message formats for interactive proof documents. *) structure Protocol: sig end = struct val _ = Isabelle_Process.protocol_command "Prover.echo" (fn args => List.app writeln args); val _ = Isabelle_Process.protocol_command "Prover.options" (fn [options_yxml] => (Options.set_default (Options.decode (YXML.parse_body options_yxml)); Isabelle_Process.init_options_interactive ())); val _ = Isabelle_Process.protocol_command "Prover.init_session_base" (fn [sessions_yxml, doc_names_yxml, global_theories_yxml, loaded_theories_yxml, known_theories_yxml] => let val decode_table = YXML.parse_body #> let open XML.Decode in list (pair string string) end; val decode_list = YXML.parse_body #> let open XML.Decode in list string end; val decode_sessions = YXML.parse_body #> let open XML.Decode in list (pair string properties) end; in Resources.init_session_base {sessions = decode_sessions sessions_yxml, docs = decode_list doc_names_yxml, global_theories = decode_table global_theories_yxml, loaded_theories = decode_list loaded_theories_yxml, known_theories = decode_table known_theories_yxml} end); val _ = Isabelle_Process.protocol_command "Document.define_blob" (fn [digest, content] => Document.change_state (Document.define_blob digest content)); -fun decode_command id name blobs_yxml toks_yxml sources : Document.command = +fun decode_command id name blobs_xml toks_xml sources : Document.command = let open XML.Decode; val (blobs_digests, blobs_index) = - YXML.parse_body blobs_yxml |> + blobs_xml |> let val message = YXML.string_of_body o Protocol_Message.command_positions id; in pair (list (variant [fn ([], a) => Exn.Res (pair string (option string) a), fn ([], a) => Exn.Exn (ERROR (message a))])) int end; - val toks = YXML.parse_body toks_yxml |> list (pair int int); + val toks = list (pair int int) toks_xml; in {command_id = Document_ID.parse id, name = name, blobs_digests = blobs_digests, blobs_index = blobs_index, tokens = toks ~~ sources} end; val _ = Isabelle_Process.protocol_command "Document.define_command" - (fn id :: name :: blobs_yxml :: toks_yxml :: sources => - Document.change_state - (Document.define_command (decode_command id name blobs_yxml toks_yxml sources))); + (fn id :: name :: blobs :: toks :: sources => + let + val command = decode_command id name (YXML.parse_body blobs) (YXML.parse_body toks) sources; + in Document.change_state (Document.define_command command) end); + +val _ = + Isabelle_Process.protocol_command "Document.define_commands" + (fn args => + let + fun decode arg = + let + open XML.Decode; + val (id, (name, (blobs_xml, (toks_xml, sources)))) = + pair string (pair string (pair I (pair I (list string)))) (YXML.parse_body arg); + in decode_command id name blobs_xml toks_xml sources end; + in Document.change_state (fold (Document.define_command o decode) args) end); val _ = Isabelle_Process.protocol_command "Document.discontinue_execution" (fn [] => Execution.discontinue ()); val _ = Isabelle_Process.protocol_command "Document.cancel_exec" (fn [exec_id] => Execution.cancel (Document_ID.parse exec_id)); val _ = Isabelle_Process.protocol_command "Document.update" (Future.task_context "Document.update" (Future.new_group NONE) (fn old_id_string :: new_id_string :: consolidate_yxml :: edits_yxml => Document.change_state (fn state => let val old_id = Document_ID.parse old_id_string; val new_id = Document_ID.parse new_id_string; val consolidate = YXML.parse_body consolidate_yxml |> let open XML.Decode in list string end; val edits = edits_yxml |> map (YXML.parse_body #> let open XML.Decode in pair string (variant [fn ([], a) => Document.Edits (list (pair (option int) (option int)) a), fn ([], a) => let val (master, (name, (imports, (keywords, errors)))) = pair string (pair string (pair (list string) (pair (list (pair string (pair (pair string (list string)) (list string)))) (list YXML.string_of_body)))) a; val imports' = map (rpair Position.none) imports; val keywords' = map (fn (x, y) => ((x, Position.none), y)) keywords; val header = Thy_Header.make (name, Position.none) imports' keywords'; in Document.Deps {master = master, header = header, errors = errors} end, fn (a :: b, c) => Document.Perspective (bool_atom a, map int_atom b, list (pair int (pair string (list string))) c)]) end); val _ = Execution.discontinue (); val (edited, removed, assign_update, state') = Document.update old_id new_id edits consolidate state; val _ = (singleton o Future.forks) {name = "Document.update/remove", group = NONE, deps = Execution.snapshot removed, pri = Task_Queue.urgent_pri + 2, interrupts = false} (fn () => (Execution.purge removed; List.app Isabelle_Process.reset_tracing removed)); val _ = Output.protocol_message Markup.assign_update ((new_id, edited, assign_update) |> let open XML.Encode; fun encode_upd (a, bs) = string (space_implode "," (map Value.print_int (a :: bs))); in triple int (list string) (list encode_upd) end |> YXML.chunks_of_body); in Document.start_execution state' end))); val _ = Isabelle_Process.protocol_command "Document.remove_versions" (fn [versions_yxml] => Document.change_state (fn state => let val versions = YXML.parse_body versions_yxml |> let open XML.Decode in list int end; val state1 = Document.remove_versions versions state; val _ = Output.protocol_message Markup.removed_versions [versions_yxml]; in state1 end)); val _ = Isabelle_Process.protocol_command "Document.dialog_result" (fn [serial, result] => Active.dialog_result (Value.parse_int serial) result handle exn => if Exn.is_interrupt exn then () (*sic!*) else Exn.reraise exn); val _ = Isabelle_Process.protocol_command "ML_Heap.share_common_data" (fn [] => ML_Heap.share_common_data ()); end; diff --git a/src/Pure/PIDE/protocol.scala b/src/Pure/PIDE/protocol.scala --- a/src/Pure/PIDE/protocol.scala +++ b/src/Pure/PIDE/protocol.scala @@ -1,335 +1,365 @@ /* Title: Pure/PIDE/protocol.scala Author: Makarius Protocol message formats for interactive proof documents. */ package isabelle object Protocol { /* document editing */ object Assign_Update { def unapply(text: String) : Option[(Document_ID.Version, List[String], Document.Assign_Update)] = { try { import XML.Decode._ def decode_upd(body: XML.Body): (Long, List[Long]) = space_explode(',', string(body)).map(Value.Long.parse) match { case a :: bs => (a, bs) case _ => throw new XML.XML_Body(body) } Some(triple(long, list(string), list(decode_upd _))(Symbol.decode_yxml(text))) } catch { case ERROR(_) => None case _: XML.Error => None } } } object Removed { def unapply(text: String): Option[List[Document_ID.Version]] = try { import XML.Decode._ Some(list(long)(Symbol.decode_yxml(text))) } catch { case ERROR(_) => None case _: XML.Error => None } } /* command timing */ object Command_Timing { def unapply(props: Properties.T): Option[(Document_ID.Generic, isabelle.Timing)] = props match { case Markup.COMMAND_TIMING :: args => (args, args) match { case (Position.Id(id), Markup.Timing_Properties(timing)) => Some((id, timing)) case _ => None } case _ => None } } /* theory timing */ object Theory_Timing { def unapply(props: Properties.T): Option[(String, isabelle.Timing)] = props match { case Markup.THEORY_TIMING :: args => (args, args) match { case (Markup.Name(name), Markup.Timing_Properties(timing)) => Some((name, timing)) case _ => None } case _ => None } } /* result messages */ def is_result(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.RESULT, _), _) => true case _ => false } def is_tracing(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.TRACING, _), _) => true case XML.Elem(Markup(Markup.TRACING_MESSAGE, _), _) => true case _ => false } def is_state(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.STATE, _), _) => true case XML.Elem(Markup(Markup.STATE_MESSAGE, _), _) => true case _ => false } def is_information(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.INFORMATION, _), _) => true case XML.Elem(Markup(Markup.INFORMATION_MESSAGE, _), _) => true case _ => false } def is_writeln(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.WRITELN, _), _) => true case XML.Elem(Markup(Markup.WRITELN_MESSAGE, _), _) => true case _ => false } def is_warning(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.WARNING, _), _) => true case XML.Elem(Markup(Markup.WARNING_MESSAGE, _), _) => true case _ => false } def is_legacy(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.LEGACY, _), _) => true case XML.Elem(Markup(Markup.LEGACY_MESSAGE, _), _) => true case _ => false } def is_error(msg: XML.Tree): Boolean = msg match { case XML.Elem(Markup(Markup.ERROR, _), _) => true case XML.Elem(Markup(Markup.ERROR_MESSAGE, _), _) => true case _ => false } def is_inlined(msg: XML.Tree): Boolean = !(is_result(msg) || is_tracing(msg) || is_state(msg)) def is_exported(msg: XML.Tree): Boolean = is_writeln(msg) || is_warning(msg) || is_legacy(msg) || is_error(msg) /* breakpoints */ object ML_Breakpoint { def unapply(tree: XML.Tree): Option[Long] = tree match { case XML.Elem(Markup(Markup.ML_BREAKPOINT, Markup.Serial(breakpoint)), _) => Some(breakpoint) case _ => None } } /* dialogs */ object Dialog_Args { def unapply(props: Properties.T): Option[(Document_ID.Generic, Long, String)] = (props, props, props) match { case (Position.Id(id), Markup.Serial(serial), Markup.Result(result)) => Some((id, serial, result)) case _ => None } } object Dialog { def unapply(tree: XML.Tree): Option[(Document_ID.Generic, Long, String)] = tree match { case XML.Elem(Markup(Markup.DIALOG, Dialog_Args(id, serial, result)), _) => Some((id, serial, result)) case _ => None } } object Dialog_Result { def apply(id: Document_ID.Generic, serial: Long, result: String): XML.Elem = { val props = Position.Id(id) ::: Markup.Serial(serial) XML.Elem(Markup(Markup.RESULT, props), List(XML.Text(result))) } def unapply(tree: XML.Tree): Option[String] = tree match { case XML.Elem(Markup(Markup.RESULT, _), List(XML.Text(result))) => Some(result) case _ => None } } } trait Protocol { /* protocol commands */ def protocol_command_raw(name: String, args: List[Bytes]): Unit def protocol_command_args(name: String, args: List[String]) def protocol_command(name: String, args: String*): Unit /* options */ def options(opts: Options): Unit = protocol_command("Prover.options", Symbol.encode_yxml(opts.encode)) /* session base */ private def encode_table(table: List[(String, String)]): String = { import XML.Encode._ Symbol.encode_yxml(list(pair(string, string))(table)) } private def encode_list(lst: List[String]): String = { import XML.Encode._ Symbol.encode_yxml(list(string)(lst)) } private def encode_sessions(lst: List[(String, Position.T)]): String = { import XML.Encode._ Symbol.encode_yxml(list(pair(string, properties))(lst)) } def session_base(resources: Resources) { val base = resources.session_base.standard_path protocol_command("Prover.init_session_base", encode_sessions(base.known.sessions.toList), encode_list(base.doc_names), encode_table(base.global_theories.toList), encode_list(base.loaded_theories.keys), encode_table(base.dest_known_theories)) } /* interned items */ def define_blob(digest: SHA1.Digest, bytes: Bytes): Unit = protocol_command_raw("Document.define_blob", List(Bytes(digest.toString), bytes)) - def define_command(command: Command) + private def encode_command(command: Command): (String, String, String, String, List[String]) = { + import XML.Encode._ + val blobs_yxml = { - import XML.Encode._ val encode_blob: T[Command.Blob] = variant(List( { case Exn.Res((a, b)) => (Nil, pair(string, option(string))((a.node, b.map(p => p._1.toString)))) }, { case Exn.Exn(e) => (Nil, string(Exn.message(e))) })) Symbol.encode_yxml(pair(list(encode_blob), int)(command.blobs, command.blobs_index)) } - val toks = command.span.content val toks_yxml = { - import XML.Encode._ val encode_tok: T[Token] = (tok => pair(int, int)((tok.kind.id, Symbol.length(tok.source)))) - Symbol.encode_yxml(list(encode_tok)(toks)) + Symbol.encode_yxml(list(encode_tok)(command.span.content)) } + val toks_sources = command.span.content.map(tok => Symbol.encode(tok.source)) - protocol_command_args("Document.define_command", - Document_ID(command.id) :: Symbol.encode(command.span.name) :: blobs_yxml :: toks_yxml :: - toks.map(tok => Symbol.encode(tok.source))) + (Document_ID(command.id), Symbol.encode(command.span.name), blobs_yxml, toks_yxml, toks_sources) + } + + def define_command(command: Command) + { + val (command_id, name, blobs_yxml, toks_yxml, toks_sources) = encode_command(command) + protocol_command_args( + "Document.define_command", command_id :: name :: blobs_yxml :: toks_yxml :: toks_sources) + } + + def define_commands(commands: List[Command]) + { + protocol_command_args("Document.define_commands", + commands.map(command => + { + import XML.Encode._ + val (command_id, name, blobs_yxml, toks_yxml, toks_sources) = encode_command(command) + val body = + pair(string, pair(string, pair(string, pair(string, list(string)))))( + command_id, (name, (blobs_yxml, (toks_yxml, toks_sources)))) + YXML.string_of_body(body) + })) + } + + def define_commands_bulk(commands: List[Command]) + { + val (irregular, regular) = commands.partition(command => YXML.detect(command.source)) + irregular.foreach(define_command(_)) + regular match { + case Nil => + case List(command) => define_command(command) + case _ => define_commands(regular) + } } /* execution */ def discontinue_execution(): Unit = protocol_command("Document.discontinue_execution") def cancel_exec(id: Document_ID.Exec): Unit = protocol_command("Document.cancel_exec", Document_ID(id)) /* document versions */ def update(old_id: Document_ID.Version, new_id: Document_ID.Version, edits: List[Document.Edit_Command], consolidate: List[Document.Node.Name]) { val consolidate_yxml = { import XML.Encode._ Symbol.encode_yxml(list(string)(consolidate.map(_.node))) } val edits_yxml = { import XML.Encode._ def id: T[Command] = (cmd => long(cmd.id)) def encode_edit(name: Document.Node.Name) : T[Document.Node.Edit[Command.Edit, Command.Perspective]] = variant(List( { case Document.Node.Edits(a) => (Nil, list(pair(option(id), option(id)))(a)) }, { case Document.Node.Deps(header) => val master_dir = File.standard_url(name.master_dir) val imports = header.imports.map(_.node) val keywords = header.keywords.map({ case (a, Keyword.Spec(b, c, d)) => (a, ((b, c), d)) }) (Nil, pair(string, pair(string, pair(list(string), pair(list(pair(string, pair(pair(string, list(string)), list(string)))), list(string)))))( (master_dir, (name.theory, (imports, (keywords, header.errors)))))) }, { case Document.Node.Perspective(a, b, c) => (bool_atom(a) :: b.commands.map(cmd => long_atom(cmd.id)), list(pair(id, pair(string, list(string))))(c.dest)) })) edits.map({ case (name, edit) => Symbol.encode_yxml(pair(string, encode_edit(name))(name.node, edit)) }) } protocol_command_args("Document.update", Document_ID(old_id) :: Document_ID(new_id) :: consolidate_yxml :: edits_yxml) } def remove_versions(versions: List[Document.Version]) { val versions_yxml = { import XML.Encode._ Symbol.encode_yxml(list(long)(versions.map(_.id))) } protocol_command("Document.remove_versions", versions_yxml) } /* dialog via document content */ def dialog_result(serial: Long, result: String): Unit = protocol_command("Document.dialog_result", Value.Long(serial), result) } diff --git a/src/Pure/PIDE/session.scala b/src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala +++ b/src/Pure/PIDE/session.scala @@ -1,730 +1,736 @@ /* Title: Pure/PIDE/session.scala Author: Makarius Options: :folding=explicit: PIDE editor session, potentially with running prover process. */ package isabelle import scala.collection.immutable.Queue +import scala.collection.mutable import scala.annotation.tailrec object Session { /* outlets */ object Consumer { def apply[A](name: String)(consume: A => Unit): Consumer[A] = new Consumer[A](name, consume) } final class Consumer[-A] private(val name: String, val consume: A => Unit) class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { private val consumers = Synchronized[List[Consumer[A]]](Nil) def += (c: Consumer[A]) { consumers.change(Library.update(c)) } def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } def post(a: A) { for (c <- consumers.value.iterator) { dispatcher.send(() => try { c.consume(a) } catch { case exn: Throwable => Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) }) } } } /* change */ sealed case class Change( previous: Document.Version, syntax_changed: List[Document.Node.Name], deps_changed: Boolean, doc_edits: List[Document.Edit_Command], consolidate: List[Document.Node.Name], share_common_data: Boolean, version: Document.Version) case object Change_Flush /* events */ //{{{ case class Statistics(props: Properties.T) case class Global_Options(options: Options) case object Caret_Focus case class Raw_Edits( doc_blobs: Document.Blobs, edits: List[Document.Edit_Text], share_common_data: Boolean) case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String) case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])]) case class Commands_Changed( assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command]) sealed abstract class Phase { def print: String = this match { case Terminated(result) => if (result.ok) "finished" else "failed" case _ => Word.lowercase(this.toString) } } case object Inactive extends Phase // stable case object Startup extends Phase // transient case object Ready extends Phase // metastable case object Shutdown extends Phase // transient case class Terminated(result: Process_Result) extends Phase // stable //}}} /* syslog */ private[Session] class Syslog(limit: Int) { private var queue = Queue.empty[XML.Elem] private var length = 0 def += (msg: XML.Elem): Unit = synchronized { queue = queue.enqueue(msg) length += 1 if (length > limit) queue = queue.dequeue._2 } def content: String = synchronized { cat_lines(queue.iterator.map(XML.content)) + (if (length > limit) "\n(A total of " + length + " messages...)" else "") } } /* protocol handlers */ abstract class Protocol_Handler { def init(session: Session): Unit = {} def exit(): Unit = {} val functions: List[(String, Prover.Protocol_Output => Boolean)] } } class Session(_session_options: => Options, val resources: Resources) extends Document.Session { session => val xml_cache: XML.Cache = XML.make_cache() val xz_cache: XZ.Cache = XZ.make_cache() /* global flags */ @volatile var timing: Boolean = false @volatile var verbose: Boolean = false /* dynamic session options */ def session_options: Options = _session_options def output_delay: Time = session_options.seconds("editor_output_delay") def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay") def prune_delay: Time = session_options.seconds("editor_prune_delay") def prune_size: Int = session_options.int("editor_prune_size") def syslog_limit: Int = session_options.int("editor_syslog_limit") def reparse_limit: Int = session_options.int("editor_reparse_limit") /* dispatcher */ private val dispatcher = Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } def assert_dispatcher[A](body: => A): A = { assert(dispatcher.check_thread) body } def require_dispatcher[A](body: => A): A = { require(dispatcher.check_thread) body } def send_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send(() => body) } def send_wait_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send_wait(() => body) } /* outlets */ val statistics = new Session.Outlet[Session.Statistics](dispatcher) val global_options = new Session.Outlet[Session.Global_Options](dispatcher) val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) val phase_changed = new Session.Outlet[Session.Phase](dispatcher) val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher) val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck! /** main protocol manager **/ /* internal messages */ private case class Start(start_prover: Prover.Receiver => Prover) private case object Stop private case class Cancel_Exec(exec_id: Document_ID.Exec) private case class Protocol_Command(name: String, args: List[String]) private case class Update_Options(options: Options) private case object Consolidate_Execution private case object Prune_History /* phase */ private def post_phase(new_phase: Session.Phase): Session.Phase = { phase_changed.post(new_phase) new_phase } private val _phase = Synchronized[Session.Phase](Session.Inactive) private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase)) def phase = _phase.value def is_ready: Boolean = phase == Session.Ready /* global state */ private val syslog = new Session.Syslog(syslog_limit) def syslog_content(): String = syslog.content private val global_state = Synchronized(Document.State.init) def current_state(): Document.State = global_state.value def recent_syntax(name: Document.Node.Name): Outer_Syntax = global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse resources.session_base.overall_syntax /* pipelined change parsing */ private case class Text_Edits( previous: Future[Document.Version], doc_blobs: Document.Blobs, text_edits: List[Document.Edit_Text], consolidate: List[Document.Node.Name], share_common_data: Boolean, version_result: Promise[Document.Version]) private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) { case Text_Edits(previous, doc_blobs, text_edits, consolidate, share_common_data, version_result) => val prev = previous.get_finished val change = Timing.timeit("parse_change", timing) { resources.parse_change( reparse_limit, prev, doc_blobs, text_edits, consolidate, share_common_data) } version_result.fulfill(change.version) manager.send(change) true } /* buffered changes */ private object change_buffer { private var assignment: Boolean = false private var nodes: Set[Document.Node.Name] = Set.empty private var commands: Set[Command] = Set.empty def flush(): Unit = synchronized { if (assignment || nodes.nonEmpty || commands.nonEmpty) commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) if (nodes.nonEmpty) consolidation.update(nodes) assignment = false nodes = Set.empty commands = Set.empty } private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() } def invoke(assign: Boolean, edited_nodes: List[Document.Node.Name], cmds: List[Command]): Unit = synchronized { assignment |= assign for (node <- edited_nodes) { nodes += node } for (command <- cmds) { nodes += command.node_name command.blobs_names.foreach(nodes += _) commands += command } delay_flush.invoke() } def shutdown() { delay_flush.revoke() flush() } } /* postponed changes */ private object postponed_changes { private var postponed: List[Session.Change] = Nil def store(change: Session.Change): Unit = synchronized { postponed ::= change } def flush(state: Document.State): List[Session.Change] = synchronized { val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) postponed = unassigned assigned.reverse } } /* node consolidation */ private object consolidation { private val delay = Standard_Thread.delay_first(consolidate_delay) { manager.send(Consolidate_Execution) } private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty) private val state = Synchronized(init_state) def exit() { delay.revoke() state.change(_ => None) } def update(new_nodes: Set[Document.Node.Name] = Set.empty) { val active = state.change_result(st => (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes))) if (active) delay.invoke() } def flush(): Set[Document.Node.Name] = state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None)) } /* prover process */ private object prover { private val variable = Synchronized[Option[Prover]](None) def defined: Boolean = variable.value.isDefined def get: Prover = variable.value.get def set(p: Prover) { variable.change(_ => Some(p)) } def reset { variable.change(_ => None) } def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) } } /* file formats */ lazy val file_formats: File_Format.Session = resources.file_formats.start_session(session) /* protocol handlers */ private val protocol_handlers = Protocol_Handlers.init(session) def get_protocol_handler(name: String): Option[Session.Protocol_Handler] = protocol_handlers.get(name) def init_protocol_handler(handler: Session.Protocol_Handler): Unit = protocol_handlers.init(handler) def init_protocol_handler(name: String): Unit = protocol_handlers.init(name) /* debugger */ private val debugger_handler = new Debugger.Handler(this) init_protocol_handler(debugger_handler) def debugger: Debugger = debugger_handler.debugger /* manager thread */ private val delay_prune = Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) } private val manager: Consumer_Thread[Any] = { /* raw edits */ def handle_raw_edits( doc_blobs: Document.Blobs = Document.Blobs.empty, edits: List[Document.Edit_Text] = Nil, consolidate: List[Document.Node.Name] = Nil, share_common_data: Boolean = false) //{{{ { require(prover.defined) prover.get.discontinue_execution() val previous = global_state.value.history.tip.version val version = Future.promise[Document.Version] global_state.change(_.continue_history(previous, edits, version)) raw_edits.post(Session.Raw_Edits(doc_blobs, edits, share_common_data)) change_parser.send( Text_Edits(previous, doc_blobs, edits, consolidate, share_common_data, version)) } //}}} /* resulting changes */ def handle_change(change: Session.Change) //{{{ { require(prover.defined) - def id_command(command: Command) + // define commands { - for { - (name, digest) <- command.blobs_defined - if !global_state.value.defined_blob(digest) - } { - change.version.nodes(name).get_blob match { - case Some(blob) => - global_state.change(_.define_blob(digest)) - prover.get.define_blob(digest, blob.bytes) - case None => - Output.error_message("Missing blob " + quote(name.toString)) + val id_commands = new mutable.ListBuffer[Command] + def id_command(command: Command) + { + for { + (name, digest) <- command.blobs_defined + if !global_state.value.defined_blob(digest) + } { + change.version.nodes(name).get_blob match { + case Some(blob) => + global_state.change(_.define_blob(digest)) + prover.get.define_blob(digest, blob.bytes) + case None => + Output.error_message("Missing blob " + quote(name.toString)) + } + } + + if (!global_state.value.defined_command(command.id)) { + global_state.change(_.define_command(command)) + id_commands += command } } - - if (!global_state.value.defined_command(command.id)) { - global_state.change(_.define_command(command)) - prover.get.define_command(command) + for { (_, edit) <- change.doc_edits } { + edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) } - } - for { (_, edit) <- change.doc_edits } { - edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) + if (id_commands.nonEmpty) prover.get.define_commands_bulk(id_commands.toList) } val assignment = global_state.value.the_assignment(change.previous).check_finished global_state.change(_.define_version(change.version, assignment)) if (change.share_common_data) { prover.get.protocol_command("ML_Heap.share_common_data") } prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate) resources.commit(change) } //}}} /* prover output */ def handle_output(output: Prover.Output) //{{{ { def bad_output() { if (verbose) Output.warning("Ignoring bad prover output: " + output.message.toString) } def change_command(f: Document.State => (Command.State, Document.State)) { try { val st = global_state.change_result(f) change_buffer.invoke(false, Nil, List(st.command)) } catch { case _: Document.State.Fail => bad_output() } } output match { case msg: Prover.Protocol_Output => val handled = protocol_handlers.invoke(msg) if (!handled) { msg.properties match { case Markup.Protocol_Handler(name) if prover.defined => init_protocol_handler(name) case Protocol.Command_Timing(state_id, timing) if prover.defined => val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache)) case Protocol.Theory_Timing(_, _) => // FIXME case Markup.Export(args) if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => val id = Value.Long.unapply(args.id.get).get val export = Export.make_entry("", args, msg.bytes, cache = xz_cache) change_command(_.add_export(id, (args.serial, export))) case Markup.Assign_Update => msg.text match { case Protocol.Assign_Update(id, edited, update) => try { val (edited_nodes, cmds) = global_state.change_result(_.assign(id, edited, update)) change_buffer.invoke(true, edited_nodes, cmds) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } delay_prune.invoke() case Markup.Removed_Versions => msg.text match { case Protocol.Removed(removed) => try { global_state.change(_.removed_versions(removed)) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } case Markup.ML_Statistics(props) => statistics.post(Session.Statistics(props)) case Markup.Task_Statistics(props) => // FIXME case _ => bad_output() } } case _ => output.properties match { case Position.Id(state_id) => change_command(_.accumulate(state_id, output.message, xml_cache)) case _ if output.is_init => prover.get.options(file_formats.prover_options(session_options)) prover.get.session_base(resources) phase = Session.Ready debugger.ready() case Markup.Process_Result(result) if output.is_exit => file_formats.stop_session phase = Session.Terminated(result) prover.reset case _ => raw_output_messages.post(output) } } } //}}} /* main thread */ Consumer_Thread.fork[Any]("Session.manager", daemon = true) { case arg: Any => //{{{ arg match { case output: Prover.Output => if (output.is_stdout || output.is_stderr) raw_output_messages.post(output) else handle_output(output) if (output.is_syslog) { syslog += output.message syslog_messages.post(output) } all_messages.post(output) case input: Prover.Input => all_messages.post(input) case Start(start_prover) if !prover.defined => prover.set(start_prover(manager.send(_))) case Stop => consolidation.exit() delay_prune.revoke() if (prover.defined) { protocol_handlers.exit() global_state.change(_ => Document.State.init) prover.get.terminate } case Consolidate_Execution => if (prover.defined) { val state = global_state.value state.stable_tip_version match { case None => consolidation.update() case Some(version) => val consolidate = consolidation.flush().iterator.filter(name => !resources.session_base.loaded_theory(name) && !state.node_consolidated(version, name) && state.node_maybe_consolidated(version, name)).toList if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate) } } case Prune_History => if (prover.defined) { val old_versions = global_state.change_result(_.remove_versions(prune_size)) if (old_versions.nonEmpty) prover.get.remove_versions(old_versions) } case Update_Options(options) => if (prover.defined && is_ready) { prover.get.options(file_formats.prover_options(options)) handle_raw_edits() } global_options.post(Session.Global_Options(options)) case Cancel_Exec(exec_id) if prover.defined => prover.get.cancel_exec(exec_id) case Session.Raw_Edits(doc_blobs, edits, share_common_data) if prover.defined => handle_raw_edits(doc_blobs = doc_blobs, edits = edits, share_common_data = share_common_data) case Session.Dialog_Result(id, serial, result) if prover.defined => prover.get.dialog_result(serial, result) handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) case Protocol_Command(name, args) if prover.defined => prover.get.protocol_command_args(name, args) case change: Session.Change if prover.defined => val state = global_state.value if (!state.removing_versions && state.is_assigned(change.previous)) handle_change(change) else postponed_changes.store(change) case Session.Change_Flush if prover.defined => val state = global_state.value if (!state.removing_versions) postponed_changes.flush(state).foreach(handle_change(_)) case bad => if (verbose) Output.warning("Ignoring bad message: " + bad.toString) } true //}}} } } /* main operations */ def snapshot(name: Document.Node.Name = Document.Node.Name.empty, pending_edits: List[Text.Edit] = Nil): Document.Snapshot = global_state.value.snapshot(name, pending_edits) @tailrec final def await_stable_snapshot(): Document.Snapshot = { val snapshot = this.snapshot() if (snapshot.is_outdated) { Thread.sleep(output_delay.ms) await_stable_snapshot() } else snapshot } def start(start_prover: Prover.Receiver => Prover) { file_formats _phase.change( { case Session.Inactive => manager.send(Start(start_prover)) post_phase(Session.Startup) case phase => error("Cannot start prover in phase " + quote(phase.print)) }) } def send_stop() { val was_ready = _phase.guarded_access(phase => phase match { case Session.Startup | Session.Shutdown => None case Session.Terminated(_) => Some((false, phase)) case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0))))) case Session.Ready => Some((true, post_phase(Session.Shutdown))) }) if (was_ready) manager.send(Stop) } def stop(): Process_Result = { send_stop() prover.await_reset() change_parser.shutdown() change_buffer.shutdown() manager.shutdown() dispatcher.shutdown() phase match { case Session.Terminated(result) => result case phase => error("Bad session phase after shutdown: " + quote(phase.print)) } } def protocol_command(name: String, args: String*) { manager.send(Protocol_Command(name, args.toList)) } def cancel_exec(exec_id: Document_ID.Exec) { manager.send(Cancel_Exec(exec_id)) } def update( doc_blobs: Document.Blobs, edits: List[Document.Edit_Text], share_common_data: Boolean = false) { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits, share_common_data)) } def update_options(options: Options) { manager.send_wait(Update_Options(options)) } def dialog_result(id: Document_ID.Generic, serial: Long, result: String) { manager.send(Session.Dialog_Result(id, serial, result)) } }