diff --git a/src/Pure/PIDE/protocol_handlers.scala b/src/Pure/PIDE/protocol_handlers.scala --- a/src/Pure/PIDE/protocol_handlers.scala +++ b/src/Pure/PIDE/protocol_handlers.scala @@ -1,95 +1,95 @@ /* Title: Pure/PIDE/protocol_handlers.scala Author: Makarius Management of add-on protocol handlers for PIDE session. */ package isabelle object Protocol_Handlers { private def bad_handler(exn: Throwable, name: String): Unit = Output.error_message( "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn)) sealed case class State( session: Session, handlers: Map[String, Session.Protocol_Handler] = Map.empty, - functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty) + functions: Map[String, Session.Protocol_Function] = Map.empty) { def get(name: String): Option[Session.Protocol_Handler] = handlers.get(name) def init(handler: Session.Protocol_Handler): State = { val name = handler.getClass.getName val (handlers1, functions1) = handlers.get(name) match { case Some(old_handler) => Output.warning("Redefining protocol handler: " + name) old_handler.exit() (handlers - name, functions -- old_handler.functions.map(_._1)) case None => (handlers, functions) } val (handlers2, functions2) = try { handler.init(session) val dups = for ((a, _) <- handler.functions if functions1.isDefinedAt(a)) yield a if (dups.nonEmpty) error("Duplicate protocol functions: " + commas_quote(dups)) (handlers1 + (name -> handler), functions1 ++ handler.functions) } catch { case exn: Throwable => bad_handler(exn, name); (handlers1, functions1) } copy(handlers = handlers2, functions = functions2) } def init(name: String): State = { val new_handler = try { Some(Class.forName(name).getDeclaredConstructor().newInstance() .asInstanceOf[Session.Protocol_Handler]) } catch { case exn: Throwable => bad_handler(exn, name); None } new_handler match { case Some(handler) => init(handler) case None => this } } def invoke(msg: Prover.Protocol_Output): Boolean = msg.properties match { case (Markup.FUNCTION, a) :: _ if functions.isDefinedAt(a) => try { functions(a)(msg) } catch { case exn: Throwable => Output.error_message( "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn)) false } case _ => false } def exit(): State = { for ((_, handler) <- handlers) handler.exit() copy(handlers = Map.empty, functions = Map.empty) } } def init(session: Session): Protocol_Handlers = new Protocol_Handlers(session) } class Protocol_Handlers private(session: Session) { private val state = Synchronized(Protocol_Handlers.State(session)) def get(name: String): Option[Session.Protocol_Handler] = state.value.get(name) def init(handler: Session.Protocol_Handler): Unit = state.change(_.init(handler)) def init(name: String): Unit = state.change(_.init(name)) def invoke(msg: Prover.Protocol_Output): Boolean = state.value.invoke(msg) def exit(): Unit = state.change(_.exit()) } diff --git a/src/Pure/PIDE/session.scala b/src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala +++ b/src/Pure/PIDE/session.scala @@ -1,748 +1,750 @@ /* Title: Pure/PIDE/session.scala Author: Makarius Options: :folding=explicit: PIDE editor session, potentially with running prover process. */ package isabelle import scala.collection.immutable.Queue import scala.collection.mutable import scala.annotation.tailrec object Session { /* outlets */ object Consumer { def apply[A](name: String)(consume: A => Unit): Consumer[A] = new Consumer[A](name, consume) } final class Consumer[-A] private(val name: String, val consume: A => Unit) class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) { private val consumers = Synchronized[List[Consumer[A]]](Nil) def += (c: Consumer[A]) { consumers.change(Library.update(c)) } def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } def post(a: A) { for (c <- consumers.value.iterator) { dispatcher.send(() => try { c.consume(a) } catch { case exn: Throwable => Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) }) } } } /* change */ sealed case class Change( previous: Document.Version, syntax_changed: List[Document.Node.Name], deps_changed: Boolean, doc_edits: List[Document.Edit_Command], consolidate: List[Document.Node.Name], version: Document.Version) case object Change_Flush /* events */ //{{{ case class Command_Timing(props: Properties.T) case class Theory_Timing(props: Properties.T) case class Runtime_Statistics(props: Properties.T) case class Task_Statistics(props: Properties.T) case class Global_Options(options: Options) case object Caret_Focus case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String) case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])]) case class Commands_Changed( assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command]) sealed abstract class Phase { def print: String = this match { case Terminated(result) => if (result.ok) "finished" else "failed" case _ => Word.lowercase(this.toString) } } case object Inactive extends Phase // stable case object Startup extends Phase // transient case object Ready extends Phase // metastable case object Shutdown extends Phase // transient case class Terminated(result: Process_Result) extends Phase // stable //}}} /* syslog */ private[Session] class Syslog(limit: Int) { private var queue = Queue.empty[XML.Elem] private var length = 0 def += (msg: XML.Elem): Unit = synchronized { queue = queue.enqueue(msg) length += 1 if (length > limit) queue = queue.dequeue._2 } def content: String = synchronized { cat_lines(queue.iterator.map(XML.content)) + (if (length > limit) "\n(A total of " + length + " messages...)" else "") } } /* protocol handlers */ + type Protocol_Function = Prover.Protocol_Output => Boolean + abstract class Protocol_Handler { def init(session: Session): Unit = {} def exit(): Unit = {} - val functions: List[(String, Prover.Protocol_Output => Boolean)] + val functions: List[(String, Protocol_Function)] } } class Session(_session_options: => Options, val resources: Resources) extends Document.Session { session => val xml_cache: XML.Cache = XML.make_cache() val xz_cache: XZ.Cache = XZ.make_cache() /* global flags */ @volatile var timing: Boolean = false @volatile var verbose: Boolean = false /* dynamic session options */ def session_options: Options = _session_options def output_delay: Time = session_options.seconds("editor_output_delay") def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay") def prune_delay: Time = session_options.seconds("editor_prune_delay") def prune_size: Int = session_options.int("editor_prune_size") def syslog_limit: Int = session_options.int("editor_syslog_limit") def reparse_limit: Int = session_options.int("editor_reparse_limit") /* dispatcher */ private val dispatcher = Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } def assert_dispatcher[A](body: => A): A = { assert(dispatcher.check_thread) body } def require_dispatcher[A](body: => A): A = { require(dispatcher.check_thread) body } def send_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send(() => body) } def send_wait_dispatcher(body: => Unit): Unit = { if (dispatcher.check_thread) body else dispatcher.send_wait(() => body) } /* outlets */ val command_timings = new Session.Outlet[Session.Command_Timing](dispatcher) val theory_timings = new Session.Outlet[Session.Theory_Timing](dispatcher) val runtime_statistics = new Session.Outlet[Session.Runtime_Statistics](dispatcher) val task_statistics = new Session.Outlet[Session.Task_Statistics](dispatcher) val global_options = new Session.Outlet[Session.Global_Options](dispatcher) val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) val phase_changed = new Session.Outlet[Session.Phase](dispatcher) val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher) val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck! /** main protocol manager **/ /* internal messages */ private case class Start(start_prover: Prover.Receiver => Prover) private case object Stop private case class Get_State(promise: Promise[Document.State]) private case class Cancel_Exec(exec_id: Document_ID.Exec) private case class Protocol_Command(name: String, args: List[String]) private case class Update_Options(options: Options) private case object Consolidate_Execution private case object Prune_History /* phase */ private def post_phase(new_phase: Session.Phase): Session.Phase = { phase_changed.post(new_phase) new_phase } private val _phase = Synchronized[Session.Phase](Session.Inactive) private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase)) def phase: Session.Phase = _phase.value def is_ready: Boolean = phase == Session.Ready /* syslog */ private val syslog = new Session.Syslog(syslog_limit) def syslog_content(): String = syslog.content /* pipelined change parsing */ private case class Text_Edits( previous: Future[Document.Version], doc_blobs: Document.Blobs, text_edits: List[Document.Edit_Text], consolidate: List[Document.Node.Name], version_result: Promise[Document.Version]) private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) { case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) => val prev = previous.get_finished val change = Timing.timeit("parse_change", timing) { resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate) } version_result.fulfill(change.version) manager.send(change) true } /* buffered changes */ private object change_buffer { private var assignment: Boolean = false private var nodes: Set[Document.Node.Name] = Set.empty private var commands: Set[Command] = Set.empty def flush(): Unit = synchronized { if (assignment || nodes.nonEmpty || commands.nonEmpty) commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) if (nodes.nonEmpty) consolidation.update(nodes) assignment = false nodes = Set.empty commands = Set.empty } private val delay_flush = Delay.first(output_delay) { flush() } def invoke(assign: Boolean, edited_nodes: List[Document.Node.Name], cmds: List[Command]): Unit = synchronized { assignment |= assign for (node <- edited_nodes) { nodes += node } for (command <- cmds) { nodes += command.node_name command.blobs_names.foreach(nodes += _) commands += command } delay_flush.invoke() } def shutdown() { delay_flush.revoke() flush() } } /* postponed changes */ private object postponed_changes { private var postponed: List[Session.Change] = Nil def store(change: Session.Change): Unit = synchronized { postponed ::= change } def flush(state: Document.State): List[Session.Change] = synchronized { val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) postponed = unassigned assigned.reverse } } /* node consolidation */ private object consolidation { private val delay = Delay.first(consolidate_delay) { manager.send(Consolidate_Execution) } private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty) private val state = Synchronized(init_state) def exit() { delay.revoke() state.change(_ => None) } def update(new_nodes: Set[Document.Node.Name] = Set.empty) { val active = state.change_result(st => (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes))) if (active) delay.invoke() } def flush(): Set[Document.Node.Name] = state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None)) } /* prover process */ private object prover { private val variable = Synchronized[Option[Prover]](None) def defined: Boolean = variable.value.isDefined def get: Prover = variable.value.get def set(p: Prover) { variable.change(_ => Some(p)) } def reset { variable.change(_ => None) } def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) } } /* file formats */ lazy val file_formats: File_Format.Session = File_Format.registry.start_session(session) /* protocol handlers */ private val protocol_handlers = Protocol_Handlers.init(session) def get_protocol_handler(name: String): Option[Session.Protocol_Handler] = protocol_handlers.get(name) def init_protocol_handler(handler: Session.Protocol_Handler): Unit = protocol_handlers.init(handler) def init_protocol_handler(name: String): Unit = protocol_handlers.init(name) /* debugger */ private val debugger_handler = new Debugger.Handler(this) init_protocol_handler(debugger_handler) def debugger: Debugger = debugger_handler.debugger /* manager thread */ private val delay_prune = Delay.first(prune_delay) { manager.send(Prune_History) } private val manager: Consumer_Thread[Any] = { /* global state */ val global_state = Synchronized(Document.State.init) /* raw edits */ def handle_raw_edits( doc_blobs: Document.Blobs = Document.Blobs.empty, edits: List[Document.Edit_Text] = Nil, consolidate: List[Document.Node.Name] = Nil) //{{{ { require(prover.defined) if (edits.nonEmpty) prover.get.discontinue_execution() val previous = global_state.value.history.tip.version val version = Future.promise[Document.Version] global_state.change(_.continue_history(previous, edits, version)) raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version)) } //}}} /* resulting changes */ def handle_change(change: Session.Change) //{{{ { require(prover.defined) // define commands { val id_commands = new mutable.ListBuffer[Command] def id_command(command: Command) { for { (name, digest) <- command.blobs_defined if !global_state.value.defined_blob(digest) } { change.version.nodes(name).get_blob match { case Some(blob) => global_state.change(_.define_blob(digest)) prover.get.define_blob(digest, blob.bytes) case None => Output.error_message("Missing blob " + quote(name.toString)) } } if (!global_state.value.defined_command(command.id)) { global_state.change(_.define_command(command)) id_commands += command } } for { (_, edit) <- change.doc_edits } { edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) } if (id_commands.nonEmpty) prover.get.define_commands_bulk(id_commands.toList) } val assignment = global_state.value.the_assignment(change.previous).check_finished global_state.change(_.define_version(change.version, assignment)) prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate) resources.commit(change) } //}}} /* prover output */ def handle_output(output: Prover.Output) //{{{ { def bad_output() { if (verbose) Output.warning("Ignoring bad prover output: " + output.message.toString) } def change_command(f: Document.State => (Command.State, Document.State)) { try { val st = global_state.change_result(f) change_buffer.invoke(false, Nil, List(st.command)) } catch { case _: Document.State.Fail => bad_output() } } output match { case msg: Prover.Protocol_Output => val handled = protocol_handlers.invoke(msg) if (!handled) { msg.properties match { case Markup.Protocol_Handler(name) if prover.defined => init_protocol_handler(name) case Protocol.Command_Timing(props, state_id, timing) if prover.defined => command_timings.post(Session.Command_Timing(props)) val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache)) case Markup.Theory_Timing(props) => theory_timings.post(Session.Theory_Timing(props)) case Markup.ML_Statistics(props) => runtime_statistics.post(Session.Runtime_Statistics(props)) case Markup.Task_Statistics(props) => task_statistics.post(Session.Task_Statistics(props)) case Protocol.Export(args) if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => val id = Value.Long.unapply(args.id.get).get val export = Export.make_entry("", args, msg.bytes, cache = xz_cache) change_command(_.add_export(id, (args.serial, export))) case List(Markup.Commands_Accepted.PROPERTY) => msg.text match { case Protocol.Commands_Accepted(ids) => ids.foreach(id => change_command(_.accumulate(id, Protocol.Commands_Accepted.message, xml_cache))) case _ => bad_output() } case List(Markup.Assign_Update.PROPERTY) => msg.text match { case Protocol.Assign_Update(id, edited, update) => try { val (edited_nodes, cmds) = global_state.change_result(_.assign(id, edited, update)) change_buffer.invoke(true, edited_nodes, cmds) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } delay_prune.invoke() case List(Markup.Removed_Versions.PROPERTY) => msg.text match { case Protocol.Removed(removed) => try { global_state.change(_.removed_versions(removed)) manager.send(Session.Change_Flush) } catch { case _: Document.State.Fail => bad_output() } case _ => bad_output() } case _ => bad_output() } } case _ => output.properties match { case Position.Id(state_id) => change_command(_.accumulate(state_id, output.message, xml_cache)) case _ if output.is_init => prover.get.options(file_formats.prover_options(session_options)) prover.get.init_session(resources) phase = Session.Ready debugger.ready() case Markup.Process_Result(result) if output.is_exit => file_formats.stop_session phase = Session.Terminated(result) prover.reset case _ => raw_output_messages.post(output) } } } //}}} /* main thread */ Consumer_Thread.fork[Any]("Session.manager", daemon = true) { case arg: Any => //{{{ arg match { case output: Prover.Output => if (output.is_syslog) { syslog += output.message syslog_messages.post(output) } if (output.is_stdout || output.is_stderr) raw_output_messages.post(output) else handle_output(output) all_messages.post(output) case input: Prover.Input => all_messages.post(input) case Start(start_prover) if !prover.defined => prover.set(start_prover(manager.send(_))) case Stop => consolidation.exit() delay_prune.revoke() if (prover.defined) { protocol_handlers.exit() global_state.change(_ => Document.State.init) prover.get.terminate } case Get_State(promise) => promise.fulfill(global_state.value) case Consolidate_Execution => if (prover.defined) { val state = global_state.value state.stable_tip_version match { case None => consolidation.update() case Some(version) => val consolidate = consolidation.flush().iterator.filter(name => !resources.session_base.loaded_theory(name) && !state.node_consolidated(version, name) && state.node_maybe_consolidated(version, name)).toList if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate) } } case Prune_History => if (prover.defined) { val old_versions = global_state.change_result(_.remove_versions(prune_size)) if (old_versions.nonEmpty) prover.get.remove_versions(old_versions) } case Update_Options(options) => if (prover.defined && is_ready) { prover.get.options(file_formats.prover_options(options)) handle_raw_edits() } global_options.post(Session.Global_Options(options)) case Cancel_Exec(exec_id) if prover.defined => prover.get.cancel_exec(exec_id) case Session.Raw_Edits(doc_blobs, edits) if prover.defined => handle_raw_edits(doc_blobs = doc_blobs, edits = edits) case Session.Dialog_Result(id, serial, result) if prover.defined => prover.get.dialog_result(serial, result) handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) case Protocol_Command(name, args) if prover.defined => prover.get.protocol_command_args(name, args) case change: Session.Change if prover.defined => val state = global_state.value if (!state.removing_versions && state.is_assigned(change.previous)) handle_change(change) else postponed_changes.store(change) case Session.Change_Flush if prover.defined => val state = global_state.value if (!state.removing_versions) postponed_changes.flush(state).foreach(handle_change) case bad => if (verbose) Output.warning("Ignoring bad message: " + bad.toString) } true //}}} } } /* main operations */ def get_state(): Document.State = { if (manager.is_active) { val promise = Future.promise[Document.State] manager.send_wait(Get_State(promise)) promise.join } else Document.State.init } def snapshot(name: Document.Node.Name = Document.Node.Name.empty, pending_edits: List[Text.Edit] = Nil): Document.Snapshot = get_state().snapshot(name, pending_edits) def recent_syntax(name: Document.Node.Name): Outer_Syntax = get_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse resources.session_base.overall_syntax @tailrec final def await_stable_snapshot(): Document.Snapshot = { val snapshot = this.snapshot() if (snapshot.is_outdated) { output_delay.sleep await_stable_snapshot() } else snapshot } def start(start_prover: Prover.Receiver => Prover) { file_formats _phase.change( { case Session.Inactive => manager.send(Start(start_prover)) post_phase(Session.Startup) case phase => error("Cannot start prover in phase " + quote(phase.print)) }) } def stop(): Process_Result = { val was_ready = _phase.guarded_access( { case Session.Startup | Session.Shutdown => None case Session.Terminated(_) => Some((false, phase)) case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0))))) case Session.Ready => Some((true, post_phase(Session.Shutdown))) }) if (was_ready) manager.send(Stop) prover.await_reset() change_parser.shutdown() change_buffer.shutdown() manager.shutdown() dispatcher.shutdown() phase match { case Session.Terminated(result) => result case phase => error("Bad session phase after shutdown: " + quote(phase.print)) } } def protocol_command(name: String, args: String*) { manager.send(Protocol_Command(name, args.toList)) } def cancel_exec(exec_id: Document_ID.Exec) { manager.send(Cancel_Exec(exec_id)) } def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) } def update_options(options: Options) { manager.send_wait(Update_Options(options)) } def dialog_result(id: Document_ID.Generic, serial: Long, result: String) { manager.send(Session.Dialog_Result(id, serial, result)) } } diff --git a/src/Pure/Tools/build.scala b/src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala +++ b/src/Pure/Tools/build.scala @@ -1,955 +1,943 @@ /* Title: Pure/Tools/build.scala Author: Makarius Options: :folding=explicit: Build and manage Isabelle sessions. */ package isabelle import scala.collection.{SortedSet, mutable} import scala.annotation.tailrec object Build { /** auxiliary **/ /* persistent build info */ sealed case class Session_Info( sources: String, input_heaps: List[String], output_heap: Option[String], return_code: Int) { def ok: Boolean = return_code == 0 } /* queue with scheduling information */ private object Queue { type Timings = (List[Properties.T], Double) def load_timings(progress: Progress, store: Sessions.Store, session_name: String): Timings = { val no_timings: Timings = (Nil, 0.0) store.access_database(session_name) match { case None => no_timings case Some(db) => def ignore_error(msg: String) = { progress.echo_warning("Ignoring bad database " + db + (if (msg == "") "" else "\n" + msg)) no_timings } try { val command_timings = store.read_command_timings(db, session_name) val session_timing = store.read_session_timing(db, session_name) match { case Markup.Elapsed(t) => t case _ => 0.0 } (command_timings, session_timing) } catch { case ERROR(msg) => ignore_error(msg) case exn: java.lang.Error => ignore_error(Exn.message(exn)) case _: XML.Error => ignore_error("") } finally { db.close } } } def make_session_timing(sessions_structure: Sessions.Structure, timing: Map[String, Double]) : Map[String, Double] = { val maximals = sessions_structure.build_graph.maximals.toSet def desc_timing(session_name: String): Double = { if (maximals.contains(session_name)) timing(session_name) else { val descendants = sessions_structure.build_descendants(List(session_name)).toSet val g = sessions_structure.build_graph.restrict(descendants) (0.0 :: g.maximals.flatMap(desc => { val ps = g.all_preds(List(desc)) if (ps.exists(p => !timing.isDefinedAt(p))) None else Some(ps.map(timing(_)).sum) })).max } } timing.keySet.iterator.map(name => (name -> desc_timing(name))).toMap.withDefaultValue(0.0) } def apply(progress: Progress, sessions_structure: Sessions.Structure, store: Sessions.Store) : Queue = { val graph = sessions_structure.build_graph val names = graph.keys val timings = names.map(name => (name, load_timings(progress, store, name))) val command_timings = timings.map({ case (name, (ts, _)) => (name, ts) }).toMap.withDefaultValue(Nil) val session_timing = make_session_timing(sessions_structure, timings.map({ case (name, (_, t)) => (name, t) }).toMap) object Ordering extends scala.math.Ordering[String] { def compare_timing(name1: String, name2: String): Int = { val t1 = session_timing(name1) val t2 = session_timing(name2) if (t1 == 0.0 || t2 == 0.0) 0 else t1 compare t2 } def compare(name1: String, name2: String): Int = compare_timing(name2, name1) match { case 0 => sessions_structure(name2).timeout compare sessions_structure(name1).timeout match { case 0 => name1 compare name2 case ord => ord } case ord => ord } } new Queue(graph, SortedSet(names: _*)(Ordering), command_timings) } } private class Queue( graph: Graph[String, Sessions.Info], order: SortedSet[String], val command_timings: String => List[Properties.T]) { def is_inner(name: String): Boolean = !graph.is_maximal(name) def is_empty: Boolean = graph.is_empty def - (name: String): Queue = new Queue(graph.del_node(name), order - name, // FIXME scala-2.10.0 .. 2.12.4 TreeSet problem!? command_timings) def dequeue(skip: String => Boolean): Option[(String, Sessions.Info)] = { val it = order.iterator.dropWhile(name => skip(name) || !graph.defined(name) // FIXME scala-2.10.0 .. 2.12.4 TreeSet problem!? || !graph.is_minimal(name)) if (it.hasNext) { val name = it.next; Some((name, graph.get_node(name))) } else None } } /* PIDE protocol handler */ /* job: running prover process */ private class Job(progress: Progress, session_name: String, val info: Sessions.Info, deps: Sessions.Deps, store: Sessions.Store, do_store: Boolean, verbose: Boolean, val numa_node: Option[Int], command_timings: List[Properties.T]) { val options: Options = NUMA.policy_options(info.options, numa_node) private val sessions_structure = deps.sessions_structure private val graph_file = Isabelle_System.tmp_file("session_graph", "pdf") graphview.Graph_File.write(options, graph_file, deps(session_name).session_graph_display) private val export_tmp_dir = Isabelle_System.tmp_dir("export") private val export_consumer = Export.consumer(store.open_database(session_name, output = true), cache = store.xz_cache) private val future_result: Future[Process_Result] = Future.thread("build", uninterruptible = true) { val parent = info.parent.getOrElse("") val base = deps(parent) val args_yxml = YXML.string_of_body( { import XML.Encode._ pair(list(pair(string, int)), pair(list(properties), pair(bool, pair(Path.encode, pair(list(pair(Path.encode, Path.encode)), pair(string, pair(string, pair(string, pair(string, pair(Path.encode, pair(list(pair(Options.encode, list(pair(string, properties)))), pair(list(pair(string, properties)), pair(list(pair(string, string)), pair(list(string), pair(list(pair(string, string)), pair(list(string), list(string)))))))))))))))))( (Symbol.codes, (command_timings, (verbose, (store.browser_info, (info.document_files, (File.standard_path(graph_file), (parent, (info.chapter, (session_name, (Path.current, (info.theories, (sessions_structure.session_positions, (sessions_structure.dest_session_directories, (base.doc_names, (base.global_theories.toList, (base.loaded_theories.keys, info.bibtex_entries.map(_.info)))))))))))))))))) }) val env = Isabelle_System.settings() + ("ISABELLE_EXPORT_TMP" -> File.standard_path(export_tmp_dir)) + ("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString) val is_pure = Sessions.is_pure(session_name) val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil val eval_store = if (do_store) { (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: List("ML_Heap.save_child " + ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) } else Nil if (options.bool("pide_session")) { val resources = new Resources(sessions_structure, deps(parent)) val session = new Session(options, resources) val build_session_errors: Promise[List[String]] = Future.promise val stdout = new StringBuilder(1000) val messages = new mutable.ListBuffer[String] val command_timings = new mutable.ListBuffer[Properties.T] val theory_timings = new mutable.ListBuffer[Properties.T] val runtime_statistics = new mutable.ListBuffer[Properties.T] val task_statistics = new mutable.ListBuffer[Properties.T] + def fun( + name: String, + acc: mutable.ListBuffer[Properties.T], + unapply: Properties.T => Option[Properties.T]): (String, Session.Protocol_Function) = + { + name -> ((msg: Prover.Protocol_Output) => + unapply(msg.properties) match { + case Some(props) => acc += props; true + case _ => false + }) + } + session.init_protocol_handler(new Session.Protocol_Handler { override def exit() { build_session_errors.cancel } private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { val (rc, errors) = try { val (rc, errs) = { import XML.Decode._ pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) } val errors = for (err <- errs) yield { val prt = Protocol_Message.expose_no_reports(err) Pretty.string_of(prt, metric = Symbol.Metric) } (rc, errors) } catch { case ERROR(err) => (2, List(err)) } session.protocol_command("Prover.stop", rc.toString) build_session_errors.fulfill(errors) true } private def loading_theory(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Markup.Loading_Theory(name) => progress.theory(Progress.Theory(name, session = session_name)) true case _ => false } private def export(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Protocol.Export(args) => export_consumer(session_name, args, msg.bytes) true case _ => false } - private def command_timing(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Command_Timing(props) => command_timings += props; true - case _ => false - } - - private def theory_timing(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Theory_Timing(props) => theory_timings += props; true - case _ => false - } - - private def ml_stats(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.ML_Statistics(props) => runtime_statistics += props; true - case _ => false - } - - private def task_stats(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Task_Statistics(props) => task_statistics += props; true - case _ => false - } - val functions = List( Markup.Build_Session_Finished.name -> build_session_finished, Markup.Loading_Theory.name -> loading_theory, Markup.EXPORT -> export, - Markup.Command_Timing.name -> command_timing, - Markup.Theory_Timing.name -> theory_timing, - Markup.ML_Statistics.name -> ml_stats, - Markup.Task_Statistics.name -> task_stats) + fun(Markup.Command_Timing.name, command_timings, Markup.Command_Timing.unapply), + fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), + fun(Markup.ML_Statistics.name, runtime_statistics, Markup.ML_Statistics.unapply), + fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) }) session.all_messages += Session.Consumer[Any]("build_session_output") { case msg: Prover.Output => val message = msg.message if (msg.is_stdout) { stdout ++= Symbol.encode(XML.content(message)) } else if (Protocol.is_exported(message)) { messages += Symbol.encode(Protocol.message_text(List(message), metric = Symbol.Metric)) } case _ => } val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) val process = Isabelle_Process(session, options, sessions_structure, store, logic = parent, raw_ml_system = is_pure, use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) val errors = Isabelle_Thread.interrupt_handler(_ => process.terminate) { Exn.capture { process.await_startup } match { case Exn.Res(_) => session.protocol_command("build_session", args_yxml) build_session_errors.join_result case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) } } val process_result = Isabelle_Thread.interrupt_handler(_ => process.terminate) { process.await_shutdown } val process_output = stdout.toString :: messages.toList ::: command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: theory_timings.toList.map(Protocol.Theory_Timing_Marker.apply) ::: runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) val result = process_result.output(process_output) errors match { case Exn.Res(Nil) => result case Exn.Res(errs) => result.error_rc.output( errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: errs.map(Protocol.Error_Message_Marker.apply)) case Exn.Exn(Exn.Interrupt()) => if (result.ok) result.copy(rc = Exn.Interrupt.return_code) else result case Exn.Exn(exn) => throw exn } } else { val args_file = Isabelle_System.tmp_file("build") File.write(args_file, args_yxml) val eval_build = "Build.build " + ML_Syntax.print_string_bytes(File.standard_path(args_file)) val eval_main = Command_Line.ML_tool(eval_build :: eval_store) val process = ML_Process(options, deps.sessions_structure, store, logic = parent, raw_ml_system = is_pure, use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env, cleanup = () => args_file.delete) Isabelle_Thread.interrupt_handler(_ => process.terminate) { process.result( progress_stdout = { case Protocol.Loading_Theory_Marker(theory) => progress.theory(Progress.Theory(theory, session = session_name)) case Protocol.Export.Marker((args, path)) => val body = try { Bytes.read(path) } catch { case ERROR(msg) => error("Failed to read export " + quote(args.compound_name) + "\n" + msg) } path.file.delete export_consumer(session_name, args, body) case _ => }, progress_limit = options.int("process_output_limit") match { case 0 => None case m => Some(m * 1000000L) }, strict = false) } } } def terminate: Unit = future_result.cancel def is_finished: Boolean = future_result.is_finished private val timeout_request: Option[Event_Timer.Request] = { if (info.timeout > Time.zero) Some(Event_Timer.request(Time.now() + info.timeout) { terminate }) else None } def join: (Process_Result, Option[String]) = { val result0 = future_result.join val result1 = export_consumer.shutdown(close = true).map(Output.error_message_text) match { case Nil => result0 case errs => result0.errors(errs).error_rc } Isabelle_System.rm_tree(export_tmp_dir) if (result1.ok) Present.finish(progress, store.browser_info, graph_file, info, session_name) graph_file.delete val was_timeout = timeout_request match { case None => false case Some(request) => !request.cancel } val result2 = if (result1.interrupted) { if (was_timeout) result1.error(Output.error_message_text("Timeout")).was_timeout else result1.error(Output.error_message_text("Interrupt")) } else result1 val heap_digest = if (result2.ok && do_store && store.output_heap(session_name).is_file) Some(Sessions.write_heap_digest(store.output_heap(session_name))) else None (result2, heap_digest) } } /** build with results **/ class Results private[Build](results: Map[String, (Option[Process_Result], Sessions.Info)]) { def sessions: Set[String] = results.keySet def cancelled(name: String): Boolean = results(name)._1.isEmpty def apply(name: String): Process_Result = results(name)._1.getOrElse(Process_Result(1)) def info(name: String): Sessions.Info = results(name)._2 val rc: Int = (0 /: results.iterator.map( { case (_, (Some(r), _)) => r.rc case (_, (None, _)) => 1 }))(_ max _) def ok: Boolean = rc == 0 override def toString: String = rc.toString } def build( options: Options, selection: Sessions.Selection, progress: Progress = new Progress, check_unknown_files: Boolean = false, build_heap: Boolean = false, clean_build: Boolean = false, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, max_jobs: Int = 1, list_files: Boolean = false, check_keywords: Set[String] = Set.empty, fresh_build: Boolean = false, no_build: Boolean = false, soft_build: Boolean = false, verbose: Boolean = false, export_files: Boolean = false): Results = { val build_options = options + "ML_statistics" + "completion_limit=0" + "editor_tracing_messages=0" + "pide_reports=false" val store = Sessions.store(build_options) Isabelle_Fonts.init() /* session selection and dependencies */ val full_sessions = Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos) def sources_stamp(deps: Sessions.Deps, session_name: String): String = { val digests = full_sessions(session_name).meta_digest :: deps.sources(session_name) ::: deps.imported_sources(session_name) SHA1.digest(cat_lines(digests.map(_.toString).sorted)).toString } val deps = { val deps0 = Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true, verbose = verbose, list_files = list_files, check_keywords = check_keywords).check_errors if (soft_build && !fresh_build) { val outdated = deps0.sessions_structure.build_topological_order.flatMap(name => store.access_database(name) match { case Some(db) => using(db)(store.read_build(_, name)) match { case Some(build) if build.ok && build.sources == sources_stamp(deps0, name) => None case _ => Some(name) } case None => Some(name) }) Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), progress = progress, inlined_files = true).check_errors } else deps0 } /* check unknown files */ if (check_unknown_files) { val source_files = (for { (_, base) <- deps.session_bases.iterator (path, _) <- base.sources.iterator } yield path).toList val exclude_files = List(Path.explode("$POLYML_EXE")).map(_.canonical_file) val unknown_files = Mercurial.check_files(source_files)._2. filterNot(path => exclude_files.contains(path.canonical_file)) if (unknown_files.nonEmpty) { progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" + unknown_files.map(path => path.expand.implode).sorted.mkString("\n ", "\n ", "")) } } /* main build process */ val queue = Queue(progress, deps.sessions_structure, store) store.prepare_output_dir() if (clean_build) { for (name <- full_sessions.imports_descendants(full_sessions.imports_selection(selection))) { val (relevant, ok) = store.clean_output(name) if (relevant) { if (ok) progress.echo("Cleaned " + name) else progress.echo(name + " FAILED to clean") } } } // scheduler loop case class Result( current: Boolean, heap_digest: Option[String], process: Option[Process_Result], info: Sessions.Info) { def ok: Boolean = process match { case None => false case Some(res) => res.rc == 0 } } def sleep() { Isabelle_Thread.interrupt_handler(_ => progress.stop) { Time.seconds(0.5).sleep } } val numa_nodes = new NUMA.Nodes(numa_shuffling) @tailrec def loop( pending: Queue, running: Map[String, (List[String], Job)], results: Map[String, Result]): Map[String, Result] = { def used_node(i: Int): Boolean = running.iterator.exists( { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) if (pending.is_empty) results else { if (progress.stopped) { for ((_, (_, job)) <- running) job.terminate } running.find({ case (_, (_, job)) => job.is_finished }) match { case Some((session_name, (input_heaps, job))) => //{{{ finish job val (process_result, heap_digest) = job.join val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) val process_result_tail = { val tail = job.info.options.int("process_output_tail") process_result.copy( out_lines = "(see also " + store.output_log(session_name).file.toString + ")" :: (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0))) } // write log file if (process_result.ok) { File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) } else File.write(store.output_log(session_name), terminate_lines(log_lines)) // write database { val build_log = Build_Log.Log_File(session_name, process_result.out_lines). parse_session_info( command_timings = true, theory_timings = true, ml_statistics = true, task_statistics = true) using(store.open_database(session_name, output = true))(db => store.write_session_info(db, session_name, build_log = if (process_result.timeout) build_log.error("Timeout") else build_log, build = Session_Info(sources_stamp(deps, session_name), input_heaps, heap_digest, process_result.rc))) } // messages process_result.err_lines.foreach(progress.echo) if (process_result.ok) progress.echo( "Finished " + session_name + " (" + process_result.timing.message_resources + ")") else { progress.echo(session_name + " FAILED") if (!process_result.interrupted) progress.echo(process_result_tail.out) } loop(pending - session_name, running - session_name, results + (session_name -> Result(false, heap_digest, Some(process_result_tail), job.info))) //}}} case None if running.size < (max_jobs max 1) => //{{{ check/start next job pending.dequeue(running.isDefinedAt) match { case Some((session_name, info)) => val ancestor_results = deps.sessions_structure.build_requirements(List(session_name)). filterNot(_ == session_name).map(results(_)) val ancestor_heaps = ancestor_results.flatMap(_.heap_digest) val do_store = build_heap || Sessions.is_pure(session_name) || queue.is_inner(session_name) val (current, heap_digest) = { store.access_database(session_name) match { case Some(db) => using(db)(store.read_build(_, session_name)) match { case Some(build) => val heap_digest = store.find_heap_digest(session_name) val current = !fresh_build && build.ok && build.sources == sources_stamp(deps, session_name) && build.input_heaps == ancestor_heaps && build.output_heap == heap_digest && !(do_store && heap_digest.isEmpty) (current, heap_digest) case None => (false, None) } case None => (false, None) } } val all_current = current && ancestor_results.forall(_.current) if (all_current) loop(pending - session_name, running, results + (session_name -> Result(true, heap_digest, Some(Process_Result(0)), info))) else if (no_build) { progress.echo_if(verbose, "Skipping " + session_name + " ...") loop(pending - session_name, running, results + (session_name -> Result(false, heap_digest, Some(Process_Result(1)), info))) } else if (ancestor_results.forall(_.ok) && !progress.stopped) { progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") store.clean_output(session_name) using(store.open_database(session_name, output = true))( store.init_session_info(_, session_name)) val numa_node = numa_nodes.next(used_node) val job = new Job(progress, session_name, info, deps, store, do_store, verbose, numa_node, queue.command_timings(session_name)) loop(pending, running + (session_name -> (ancestor_heaps, job)), results) } else { progress.echo(session_name + " CANCELLED") loop(pending - session_name, running, results + (session_name -> Result(false, heap_digest, None, info))) } case None => sleep(); loop(pending, running, results) } ///}}} case None => sleep(); loop(pending, running, results) } } } /* build results */ val results0 = if (deps.is_empty) { progress.echo_warning("Nothing to build") Map.empty[String, Result] } else Isabelle_Thread.uninterruptible { loop(queue, Map.empty, Map.empty) } val results = new Results( (for ((name, result) <- results0.iterator) yield (name, (result.process, result.info))).toMap) if (export_files) { for (name <- full_sessions.imports_selection(selection).iterator if results(name).ok) { val info = results.info(name) if (info.export_files.nonEmpty) { progress.echo("Exporting " + info.name + " ...") for ((dir, prune, pats) <- info.export_files) { Export.export_files(store, name, info.dir + dir, progress = if (verbose) progress else new Progress, export_prune = prune, export_patterns = pats) } } } } if (results.rc != 0 && (verbose || !no_build)) { val unfinished = (for { name <- results.sessions.iterator if !results(name).ok } yield name).toList.sorted progress.echo("Unfinished session(s): " + commas(unfinished)) } /* global browser info */ if (!no_build) { val browser_chapters = (for { (name, result) <- results0.iterator if result.ok info = full_sessions(name) if info.options.bool("browser_info") } yield (info.chapter, (name, info.description))).toList.groupBy(_._1). map({ case (chapter, es) => (chapter, es.map(_._2)) }).filterNot(_._2.isEmpty) for ((chapter, entries) <- browser_chapters) Present.update_chapter_index(store.browser_info, chapter, entries) if (browser_chapters.nonEmpty) Present.make_global_index(store.browser_info) } results } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("build", "build and manage Isabelle sessions", args => { val build_options = Word.explode(Isabelle_System.getenv("ISABELLE_BUILD_OPTIONS")) var base_sessions: List[String] = Nil var select_dirs: List[Path] = Nil var numa_shuffling = false var requirements = false var soft_build = false var exclude_session_groups: List[String] = Nil var all_sessions = false var build_heap = false var clean_build = false var dirs: List[Path] = Nil var export_files = false var fresh_build = false var session_groups: List[String] = Nil var max_jobs = 1 var check_keywords: Set[String] = Set.empty var list_files = false var no_build = false var options = Options.init(opts = build_options) var verbose = false var exclude_sessions: List[String] = Nil val getopts = Getopts(""" Usage: isabelle build [OPTIONS] [SESSIONS ...] Options are: -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -N cyclic shuffling of NUMA CPU nodes (performance tuning) -R refer to requirements of selected sessions -S soft build: only observe changes of sources, not heap images -X NAME exclude sessions from group NAME and all descendants -a select all sessions -b build heap images -c clean build -d DIR include session directory -e export files from session specification into file-system -f fresh build -g NAME select session group NAME -j INT maximum number of parallel jobs (default 1) -k KEYWORD check theory sources for conflicts with proposed keywords -l list session source files -n no build -- test dependencies only -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose -x NAME exclude session NAME and all descendants Build and manage Isabelle sessions, depending on implicit settings: """ + Library.prefix_lines(" ", Build_Log.Settings.show()) + "\n", "B:" -> (arg => base_sessions = base_sessions ::: List(arg)), "D:" -> (arg => select_dirs = select_dirs ::: List(Path.explode(arg))), "N" -> (_ => numa_shuffling = true), "R" -> (_ => requirements = true), "S" -> (_ => soft_build = true), "X:" -> (arg => exclude_session_groups = exclude_session_groups ::: List(arg)), "a" -> (_ => all_sessions = true), "b" -> (_ => build_heap = true), "c" -> (_ => clean_build = true), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), "e" -> (_ => export_files = true), "f" -> (_ => fresh_build = true), "g:" -> (arg => session_groups = session_groups ::: List(arg)), "j:" -> (arg => max_jobs = Value.Int.parse(arg)), "k:" -> (arg => check_keywords = check_keywords + arg), "l" -> (_ => list_files = true), "n" -> (_ => no_build = true), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions = exclude_sessions ::: List(arg))) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) val start_date = Date.now() if (verbose) { progress.echo( "Started at " + Build_Log.print_date(start_date) + " (" + Isabelle_System.getenv("ML_IDENTIFIER") + " on " + Isabelle_System.hostname() +")") progress.echo(Build_Log.Settings.show() + "\n") } val results = progress.interrupt_handler { build(options, Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions, exclude_session_groups = exclude_session_groups, exclude_sessions = exclude_sessions, session_groups = session_groups, sessions = sessions), progress = progress, check_unknown_files = Mercurial.is_repository(Path.explode("~~")), build_heap = build_heap, clean_build = clean_build, dirs = dirs, select_dirs = select_dirs, numa_shuffling = NUMA.enabled_warning(progress, numa_shuffling), max_jobs = max_jobs, list_files = list_files, check_keywords = check_keywords, fresh_build = fresh_build, no_build = no_build, soft_build = soft_build, verbose = verbose, export_files = export_files) } val end_date = Date.now() val elapsed_time = end_date.time - start_date.time if (verbose) { progress.echo("\nFinished at " + Build_Log.print_date(end_date)) } val total_timing = (Timing.zero /: results.sessions.iterator.map(a => results(a).timing))(_ + _). copy(elapsed = elapsed_time) progress.echo(total_timing.message_resources) sys.exit(results.rc) }) /* build logic image */ def build_logic(options: Options, logic: String, progress: Progress = new Progress, build_heap: Boolean = false, dirs: List[Path] = Nil, fresh: Boolean = false, strict: Boolean = false): Int = { val selection = Sessions.Selection.session(logic) val rc = if (!fresh && build(options, selection, build_heap = build_heap, no_build = true, dirs = dirs).ok) 0 else { progress.echo("Build started for Isabelle/" + logic + " ...") Build.build(options, selection, progress = progress, build_heap = build_heap, fresh_build = fresh, dirs = dirs).rc } if (strict && rc != 0) error("Failed to build Isabelle/" + logic) else rc } }