diff --git a/src/Pure/Build/build_job.scala b/src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala +++ b/src/Pure/Build/build_job.scala @@ -1,561 +1,572 @@ /* Title: Pure/Build/build_job.scala Author: Makarius Build job running prover process, with rudimentary PIDE session. */ package isabelle import scala.collection.mutable trait Build_Job { def cancel(): Unit = () def is_finished: Boolean = false def join: Build_Job.Result = Build_Job.no_result } object Build_Job { sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum) val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum) /* build session */ + def store_heaps( + store: Store, + options: Options, + session: Store.Session, + database_server: Option[SQL.Database] = None, + server: SSH.Server = SSH.no_server, + progress: Progress = new Progress + ): Unit = { + using_optional(store.maybe_open_heaps_database(database_server, server = server)) { + heaps_database => + for (db <- database_server orElse heaps_database) { + val slice = Space.MiB(options.real("build_database_slice")) + ML_Heap.store(db, session, slice, cache = store.cache.compress, progress = progress) + } + } + } + def start_session( build_context: Build.Context, session_context: Session_Context, progress: Progress, log: Logger, server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, node_info: Host.Node_Info, store_heap: Boolean ): Session_Job = { new Session_Job(build_context, session_context, progress, log, server, session_background, sources_shasum, input_shasum, node_info, store_heap) } object Session_Context { def load( database_server: Option[SQL.Database], build_uuid: String, name: String, deps: List[String], ancestors: List[String], session_prefs: String, sources_shasum: SHA1.Shasum, timeout: Time, store: Store, progress: Progress = new Progress ): Session_Context = { def default: Session_Context = Session_Context( name, deps, ancestors, session_prefs, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid) def read(db: SQL.Database): Session_Context = { def ignore_error(msg: String) = { progress.echo_warning( "Ignoring bad database " + db + " for session " + quote(name) + if_proper(msg, ":\n" + msg)) default } try { val command_timings = store.read_command_timings(db, name) val elapsed = store.read_session_timing(db, name) match { case Markup.Elapsed(s) => Time.seconds(s) case _ => Time.zero } new Session_Context( name, deps, ancestors, session_prefs, sources_shasum, timeout, elapsed, command_timings, build_uuid) } catch { case ERROR(msg) => ignore_error(msg) case exn: java.lang.Error => ignore_error(Exn.message(exn)) case _: XML.Error => ignore_error("XML.Error") } } database_server match { case Some(db) => if (store.session_info_exists(db)) read(db) else default case None => using_option(store.try_open_database(name))(read) getOrElse default } } } sealed case class Session_Context( name: String, deps: List[String], ancestors: List[String], session_prefs: String, sources_shasum: SHA1.Shasum, timeout: Time, old_time: Time, old_command_timings_blob: Bytes, build_uuid: String ) extends Library.Named class Session_Job private[Build_Job]( build_context: Build.Context, session_context: Session_Context, progress: Progress, log: Logger, server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, node_info: Host.Node_Info, store_heap: Boolean ) extends Build_Job { def session_name: String = session_background.session_name private val future_result: Future[Result] = Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = Host.node_options(info.options, node_info) val store = build_context.store using_optional(store.maybe_open_database_server(server = server)) { database_server => store.clean_output(database_server, session_name, session_init = true) val session_sources = Store.Sources.load(session_background.base, cache = store.cache.compress) val env = Isabelle_System.settings( List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) val session_heaps = session_background.info.parent match { case None => Nil case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic) } val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil val eval_store = if (store_heap) { (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: List("ML_Heap.save_child " + ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) } else Nil def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = session_background.base.theory_load_commands.get(node_name.theory) match { case None => Nil case Some(spans) => val syntax = session_background.base.theory_syntax(node_name) val master_dir = Path.explode(node_name.master_dir) for (span <- spans; file <- span.loaded_files(syntax).files) yield { val src_path = Path.explode(file) val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) val bytes = session_sources(blob_name.node).bytes val text = bytes.text val chunk = Symbol.Text_Chunk(text) Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> Document.Blobs.Item(bytes, text, chunk, changed = false) } } /* session */ val resources = new Resources(session_background, log = log, command_timings = Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)) val session = new Session(options, resources) { override val cache: Term.Cache = store.cache override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = Command.Blobs_Info.make(session_blobs(node_name)) override def build_blobs(node_name: Document.Node.Name): Document.Blobs = Document.Blobs.make(session_blobs(node_name)) } object Build_Session_Errors { private val promise: Promise[List[String]] = Future.promise def result: Exn.Result[List[String]] = promise.join_result def cancel(): Unit = promise.cancel() def apply(errs: List[String]): Unit = { try { promise.fulfill(errs) } catch { case _: IllegalStateException => } } } val export_consumer = Export.consumer(store.open_database(session_name, output = true, server = server), store.cache, progress = progress) val stdout = new StringBuilder(1000) val stderr = new StringBuilder(1000) val command_timings = new mutable.ListBuffer[Properties.T] val theory_timings = new mutable.ListBuffer[Properties.T] val session_timings = new mutable.ListBuffer[Properties.T] val runtime_statistics = new mutable.ListBuffer[Properties.T] val task_statistics = new mutable.ListBuffer[Properties.T] def fun( name: String, acc: mutable.ListBuffer[Properties.T], unapply: Properties.T => Option[Properties.T] ): (String, Session.Protocol_Function) = { name -> ((msg: Prover.Protocol_Output) => unapply(msg.properties) match { case Some(props) => acc += props; true case _ => false }) } session.init_protocol_handler(new Session.Protocol_Handler { override def exit(): Unit = Build_Session_Errors.cancel() private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { val (rc, errors) = try { val (rc, errs) = { import XML.Decode._ pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) } val errors = for (err <- errs) yield { val prt = Protocol_Message.expose_no_reports(err) Pretty.string_of(prt, metric = Symbol.Metric) } (rc, errors) } catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } session.protocol_command("Prover.stop", rc.toString) Build_Session_Errors(errors) true } private def loading_theory(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Markup.Loading_Theory(Markup.Name(name)) => progress.theory(Progress.Theory(name, session = session_name)) false case _ => false } private def export_(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Protocol.Export(args) => export_consumer.make_entry(session_name, args, msg.chunk) true case _ => false } override val functions: Session.Protocol_Functions = List( Markup.Build_Session_Finished.name -> build_session_finished, Markup.Loading_Theory.name -> loading_theory, Markup.EXPORT -> export_, fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) }) session.command_timings += Session.Consumer("command_timings") { case Session.Command_Timing(props) => for { elapsed <- Markup.Elapsed.unapply(props) elapsed_time = Time.seconds(elapsed) if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") } command_timings += props.filter(Markup.command_timing_property) } session.runtime_statistics += Session.Consumer("ML_statistics") { case Session.Runtime_Statistics(props) => runtime_statistics += props } session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { case snapshot => if (!progress.stopped) { def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { if (!progress.stopped) { val theory_name = snapshot.node_name.theory val args = Protocol.Export.Args( theory_name = theory_name, name = name, compress = compress) val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) export_consumer.make_entry(session_name, args, body) } } def export_text(name: String, text: String, compress: Boolean = true): Unit = export_(name, List(XML.Text(text)), compress = compress) for (command <- snapshot.snippet_command) { export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) } export_text(Export.FILES, cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), compress = false) for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { val xml = snapshot.switch(blob_name).xml_markup() export_(Export.MARKUP + (i + 1), xml) } export_(Export.MARKUP, snapshot.xml_markup()) export_(Export.MESSAGES, snapshot.messages.map(_._1)) } } session.all_messages += Session.Consumer[Any]("build_session_output") { case msg: Prover.Output => val message = msg.message if (msg.is_system) resources.log(Protocol.message_text(message)) if (msg.is_stdout) { stdout ++= Symbol.encode(XML.content(message)) } else if (msg.is_stderr) { stderr ++= Symbol.encode(XML.content(message)) } else if (msg.is_exit) { val err = "Prover terminated" + (msg.properties match { case Markup.Process_Result(result) => ": " + result.print_rc case _ => "" }) Build_Session_Errors(List(err)) } case _ => } build_context.session_setup(session_name, session) val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) /* process */ val process = Isabelle_Process.start(options, session, session_background, session_heaps, use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) val timeout_request: Option[Event_Timer.Request] = if (info.timeout_ignored) None else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() }) val build_errors = Isabelle_Thread.interrupt_handler(_ => process.terminate()) { Exn.capture { process.await_startup() } match { case Exn.Res(_) => val resources_yxml = resources.init_session_yxml val encode_options: XML.Encode.T[Options] = options => session.prover_options(options).encode val args_yxml = YXML.string_of_body( { import XML.Encode._ pair(string, list(pair(encode_options, list(pair(string, properties)))))( (session_name, info.theories)) }) session.protocol_command("build_session", resources_yxml, args_yxml) Build_Session_Errors.result case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) } } val result0 = Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } val was_timeout = timeout_request match { case None => false case Some(request) => !request.cancel() } session.stop() val export_errors = export_consumer.shutdown(close = true).map(Output.error_message_text) val (document_output, document_errors) = try { if (Exn.is_res(build_errors) && result0.ok && info.documents.nonEmpty) { using(Export.open_database_context(store, server = server)) { database_context => val documents = using(database_context.open_session(session_background)) { session_context => Document_Build.build_documents( Document_Build.context(session_context, progress = progress), output_sources = info.document_output, output_pdf = info.document_output) } using(database_context.open_database(session_name, output = true))( session_database => documents.foreach(_.write(session_database.db, session_name))) (documents.flatMap(_.log_lines), Nil) } } else (Nil, Nil) } catch { case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) } /* process result */ val result1 = { val theory_timing = theory_timings.iterator.flatMap( { case props @ Markup.Name(name) => Some(name -> props) case _ => None }).toMap val used_theory_timings = for { (name, _) <- session_background.base.used_theories } yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) val more_output = Library.trim_line(stdout.toString) :: command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: document_output result0.output(more_output) .error(Library.trim_line(stderr.toString)) .errors_rc(export_errors ::: document_errors) } val result2 = build_errors match { case Exn.Res(build_errs) => val errs = build_errs ::: document_errors if (errs.nonEmpty) { result1.error_rc.output( errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: errs.map(Protocol.Error_Message_Marker.apply)) } else if (progress.stopped && result1.ok) { result1.copy(rc = Process_Result.RC.interrupt) } else result1 case Exn.Exn(Exn.Interrupt()) => if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) else result1 case Exn.Exn(exn) => throw exn } val process_result = if (result2.ok) result2 else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) else result2 val store_session = store.output_session(session_name, store_heap = process_result.ok && store_heap) /* output heap */ val output_shasum = store_session.heap match { case Some(path) => SHA1.shasum(ML_Heap.write_file_digest(path), session_name) case None => SHA1.no_shasum } val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) val build_log = Build_Log.Log_File(session_name, process_result.out_lines, cache = store.cache). parse_session_info( command_timings = true, theory_timings = true, ml_statistics = true, task_statistics = true) // write log file if (process_result.ok) { File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) } else File.write(store.output_log(session_name), terminate_lines(log_lines)) // write database def write_info(db: SQL.Database): Unit = store.write_session_info(db, session_name, session_sources, build_log = if (process_result.timeout) build_log.error("Timeout") else build_log, build = Store.Build_Info( sources = sources_shasum, input_heaps = input_shasum, output_heap = output_shasum, process_result.rc, build_context.build_uuid)) database_server match { case Some(db) => write_info(db) case None => using(store.open_database(session_name, output = true))(write_info) } - using_optional(store.maybe_open_heaps_database(database_server, server = server)) { - heaps_database => - for (db <- database_server orElse heaps_database) { - val slice = Space.MiB(options.real("build_database_slice")) - ML_Heap.store(db, store_session, slice, - cache = store.cache.compress, progress = progress) - } - } + store_heaps(store, options, store_session, + database_server = database_server, server = server, progress = progress) // messages process_result.err_lines.foreach(progress.echo(_)) if (process_result.ok) { val props = build_log.session_timing val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 val timing = Markup.Timing_Properties.get(props) progress.echo( "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")", verbose = true) progress.echo( "Finished " + session_name + " (" + process_result.timing.message_resources + ")") } else { progress.echo( session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") if (!process_result.interrupted) { val tail = info.options.int("process_output_tail") val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) val prefix = if (log_lines.length == suffix.length) Nil else List("...") progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) } } Result(process_result.copy(out_lines = log_lines), output_shasum) } } override def cancel(): Unit = future_result.cancel() override def is_finished: Boolean = future_result.is_finished override def join: Result = future_result.join } }