diff --git a/src/Pure/Build/build.scala b/src/Pure/Build/build.scala --- a/src/Pure/Build/build.scala +++ b/src/Pure/Build/build.scala @@ -1,935 +1,939 @@ /* Title: Pure/Build/build.scala Author: Makarius Options: :folding=explicit: Command-line tools to build and manage Isabelle sessions. */ package isabelle import scala.collection.mutable import scala.util.matching.Regex object Build { /** "isabelle build" **/ /* options */ def hostname(options: Options): String = Isabelle_System.hostname(options.string("build_hostname")) def engine_name(options: Options): String = options.string("build_engine") /* context */ sealed case class Context( store: Store, deps: isabelle.Sessions.Deps, engine: Engine = Engine.Default, afp_root: Option[Path] = None, build_hosts: List[Build_Cluster.Host] = Nil, ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), hostname: String = Isabelle_System.hostname(), numa_shuffling: Boolean = false, + numa_nodes: List[Int] = Nil, clean_sessions: List[String] = Nil, - build_heap: Boolean = false, + store_heap: Boolean = false, fresh_build: Boolean = false, no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), build_uuid: String = UUID.random_string(), build_start: Option[Date] = None, jobs: Int = 0, master: Boolean = false ) { def build_options: Options = store.options def sessions_structure: isabelle.Sessions.Structure = deps.sessions_structure def worker: Boolean = jobs > 0 override def toString: String = "Build.Context(build_uuid = " + quote(build_uuid) + if_proper(worker, ", worker = true") + if_proper(master, ", master = true") + ")" } /* results */ object Results { def apply( context: Context, results: Map[String, Process_Result] = Map.empty, other_rc: Int = Process_Result.RC.ok ): Results = { new Results(context.store, context.deps, results, other_rc) } } class Results private( val store: Store, val deps: Sessions.Deps, results: Map[String, Process_Result], other_rc: Int ) { def cache: Term.Cache = store.cache def sessions_ok: List[String] = - (for { - name <- deps.sessions_structure.build_topological_order.iterator - result <- results.get(name) if result.ok - } yield name).toList + List.from( + for { + name <- deps.sessions_structure.build_topological_order.iterator + result <- results.get(name) if result.ok + } yield name) def info(name: String): Sessions.Info = deps.sessions_structure(name) def sessions: Set[String] = results.keySet def cancelled(name: String): Boolean = !results(name).defined def apply(name: String): Process_Result = results(name).strict val rc: Int = Process_Result.RC.merge(other_rc, Process_Result.RC.merge(results.valuesIterator.map(_.strict.rc))) def ok: Boolean = rc == Process_Result.RC.ok lazy val unfinished: List[String] = sessions.iterator.filterNot(apply(_).ok).toList.sorted override def toString: String = rc.toString } /* engine */ object Engine { lazy val services: List[Engine] = Isabelle_System.make_services(classOf[Engine]) def apply(name: String): Engine = services.find(_.name == name).getOrElse(error("Bad build engine " + quote(name))) class Default extends Engine("") { override def toString: String = "" } object Default extends Default } class Engine(val name: String) extends Isabelle_System.Service { engine => override def toString: String = name def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = options + "completion_limit=0" + "editor_tracing_messages=0" if (build_cluster) options1 + "build_database" + "build_log_verbose" else options1 } final def build_store(options: Options, build_cluster: Boolean = false, cache: Term.Cache = Term.Cache.make() ): Store = { val build_options = engine.build_options(options, build_cluster = build_cluster) val store = Store(build_options, build_cluster = build_cluster, cache = cache) Isabelle_System.make_directory(store.output_dir + Path.basic("log")) Isabelle_Fonts.init() store } def open_build_process( build_context: Context, build_progress: Progress, server: SSH.Server ): Build_Process = new Build_Process(build_context, build_progress, server) final def run_build_process( context: Context, progress: Progress, server: SSH.Server ): Results = { Isabelle_Thread.uninterruptible { using(open_build_process(context, progress, server)) { build_process => build_process.prepare() build_process.run() } } } } /* build */ def build( options: Options, build_hosts: List[Build_Cluster.Host] = Nil, selection: Sessions.Selection = Sessions.Selection.empty, browser_info: Browser_Info.Config = Browser_Info.Config.none, progress: Progress = new Progress, check_unknown_files: Boolean = false, build_heap: Boolean = false, clean_build: Boolean = false, afp_root: Option[Path] = None, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, max_jobs: Option[Int] = None, list_files: Boolean = false, check_keywords: Set[String] = Set.empty, fresh_build: Boolean = false, no_build: Boolean = false, soft_build: Boolean = false, export_files: Boolean = false, augment_options: String => List[Options.Spec] = _ => Nil, session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Results = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache) val build_options = store.options using(store.open_server()) { server => /* session selection and dependencies */ val full_sessions = Sessions.load_structure(build_options, dirs = AFP.main_dirs(afp_root) ::: dirs, select_dirs = select_dirs, infos = infos, augment_options = augment_options) val full_sessions_selection = full_sessions.imports_selection(selection) val build_deps = { val deps0 = Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true, list_files = list_files, check_keywords = check_keywords ).check_errors if (soft_build && !fresh_build) { val outdated = deps0.sessions_structure.build_topological_order.flatMap(name => store.try_open_database(name, server = server) match { case Some(db) => using(db)(store.read_build(_, name)) match { case Some(build) if build.ok => val session_options = deps0.sessions_structure(name).options val session_sources = deps0.sources_shasum(name) if (Sessions.eq_sources(session_options, build.sources, session_sources)) { None } else Some(name) case _ => Some(name) } case None => Some(name) }) Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), progress = progress, inlined_files = true).check_errors } else deps0 } /* check unknown files */ if (check_unknown_files) { val source_files = - (for { - (_, base) <- build_deps.session_bases.iterator - (path, _) <- base.session_sources.iterator - } yield path).toList + List.from( + for { + (_, base) <- build_deps.session_bases.iterator + (path, _) <- base.session_sources.iterator + } yield path) Mercurial.check_files(source_files)._2 match { case Nil => case unknown_files => progress.echo_warning( "Unknown files (not part of the underlying Mercurial repository):" + unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) } } /* build process and results */ val clean_sessions = if (clean_build) full_sessions.imports_descendants(full_sessions_selection) else Nil + val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) val build_context = Context(store, build_deps, engine = engine, afp_root = afp_root, build_hosts = build_hosts, hostname = hostname(build_options), - clean_sessions = clean_sessions, build_heap = build_heap, - numa_shuffling = numa_shuffling, fresh_build = fresh_build, - no_build = no_build, session_setup = session_setup, + clean_sessions = clean_sessions, store_heap = build_heap, + numa_shuffling = numa_shuffling, numa_nodes = numa_nodes, + fresh_build = fresh_build, no_build = no_build, session_setup = session_setup, jobs = max_jobs.getOrElse(if (build_hosts.nonEmpty) 0 else 1), master = true) val results = engine.run_build_process(build_context, progress, server) if (export_files) { for (name <- full_sessions_selection.iterator if results(name).ok) { val info = results.info(name) if (info.export_files.nonEmpty) { progress.echo("Exporting " + info.name + " ...") for ((dir, prune, pats) <- info.export_files) { Export.export_files(store, name, info.dir + dir, progress = if (progress.verbose) progress else new Progress, export_prune = prune, export_patterns = pats) } } } } val presentation_sessions = results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) if (presentation_sessions.nonEmpty && !progress.stopped) { Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, progress = progress, server = server) } if (results.unfinished.nonEmpty && (progress.verbose || !no_build)) { progress.echo("Unfinished session(s): " + commas(results.unfinished)) } results } } /* build logic image */ def build_logic(options: Options, logic: String, progress: Progress = new Progress, build_heap: Boolean = false, dirs: List[Path] = Nil, fresh: Boolean = false, strict: Boolean = false ): Int = { val selection = Sessions.Selection.session(logic) val rc = if (!fresh && build(options, selection = selection, build_heap = build_heap, no_build = true, dirs = dirs).ok) Process_Result.RC.ok else { progress.echo("Build started for Isabelle/" + logic + " ...") build(options, selection = selection, progress = progress, build_heap = build_heap, fresh_build = fresh, dirs = dirs).rc } if (strict && rc != Process_Result.RC.ok) error("Failed to build Isabelle/" + logic) else rc } /* Isabelle tool wrappers */ val isabelle_tool1 = Isabelle_Tool("build", "build and manage Isabelle sessions", Scala_Project.here, { args => var afp_root: Option[Path] = None val base_sessions = new mutable.ListBuffer[String] val select_dirs = new mutable.ListBuffer[Path] val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] var numa_shuffling = false var browser_info = Browser_Info.Config.none var requirements = false var soft_build = false val exclude_session_groups = new mutable.ListBuffer[String] var all_sessions = false var build_heap = false var clean_build = false val dirs = new mutable.ListBuffer[Path] var export_files = false var fresh_build = false val session_groups = new mutable.ListBuffer[String] var max_jobs: Option[Int] = None var check_keywords: Set[String] = Set.empty var list_files = false var no_build = false var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS) var verbose = false val exclude_sessions = new mutable.ListBuffer[String] val getopts = Getopts(""" Usage: isabelle build [OPTIONS] [SESSIONS ...] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -P DIR enable HTML/PDF presentation in directory (":" for default) -R refer to requirements of selected sessions -S soft build: only observe changes of sources, not heap images -X NAME exclude sessions from group NAME and all descendants -a select all sessions -b build heap images -c clean build -d DIR include session directory -e export files from session specification into file-system -f fresh build -g NAME select session group NAME -j INT maximum number of parallel jobs (default: 1 for local build, 0 for build cluster) -k KEYWORD check theory sources for conflicts with proposed keywords -l list session source files -n no build -- take existing session build databases -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose -x NAME exclude session NAME and all descendants Build and manage Isabelle sessions: ML heaps, session databases, documents. Parameters for cluster host specifications (-H), apart from system options: """ + Library.indent_lines(4, Build_Cluster.Host.parameters.print()) + """ Notable system options: see "isabelle options -l -t build" Notable system settings: """ + Library.indent_lines(4, Build_Log.Settings.show()) + "\n", "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => base_sessions += arg), "D:" -> (arg => select_dirs += Path.explode(arg)), "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)), "N" -> (_ => numa_shuffling = true), "P:" -> (arg => browser_info = Browser_Info.Config.make(arg)), "R" -> (_ => requirements = true), "S" -> (_ => soft_build = true), "X:" -> (arg => exclude_session_groups += arg), "a" -> (_ => all_sessions = true), "b" -> (_ => build_heap = true), "c" -> (_ => clean_build = true), "d:" -> (arg => dirs += Path.explode(arg)), "e" -> (_ => export_files = true), "f" -> (_ => fresh_build = true), "g:" -> (arg => session_groups += arg), "j:" -> (arg => max_jobs = Some(Value.Nat.parse(arg))), "k:" -> (arg => check_keywords = check_keywords + arg), "l" -> (_ => list_files = true), "n" -> (_ => no_build = true), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions += arg)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) progress.echo( "Started at " + Build_Log.print_date(progress.start) + " (" + Isabelle_System.ml_identifier() + " on " + hostname(options) +")", verbose = true) progress.echo(Build_Log.Settings.show() + "\n", verbose = true) val results = progress.interrupt_handler { build(options, selection = Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions.toList, exclude_session_groups = exclude_session_groups.toList, exclude_sessions = exclude_sessions.toList, session_groups = session_groups.toList, sessions = sessions), browser_info = browser_info, progress = progress, check_unknown_files = Mercurial.is_repository(Path.ISABELLE_HOME), build_heap = build_heap, clean_build = clean_build, afp_root = afp_root, dirs = dirs.toList, select_dirs = select_dirs.toList, numa_shuffling = Host.numa_check(progress, numa_shuffling), max_jobs = max_jobs, list_files = list_files, check_keywords = check_keywords, fresh_build = fresh_build, no_build = no_build, soft_build = soft_build, export_files = export_files, build_hosts = build_hosts.toList) } val stop_date = progress.now() val elapsed_time = stop_date - progress.start progress.echo("\nFinished at " + Build_Log.print_date(stop_date), verbose = true) val total_timing = results.sessions.iterator.map(a => results(a).timing).foldLeft(Timing.zero)(_ + _). copy(elapsed = elapsed_time) progress.echo(total_timing.message_resources) sys.exit(results.rc) }) /** build cluster management **/ /* identified builds */ def read_builds(build_database: Option[SQL.Database]): List[Build_Process.Build] = build_database match { case None => Nil case Some(db) => Build_Process.read_builds(db) } def print_builds(build_database: Option[SQL.Database], builds: List[Build_Process.Build]): String = { val print_database = build_database match { case None => "" case Some(db) => " (database " + db + ")" } if (builds.isEmpty) "No build processes" + print_database else "Build processes" + print_database + builds.map(build => "\n " + build.print).mkString } def find_builds( build_database: Option[SQL.Database], build_id: String, builds: List[Build_Process.Build] ): Build_Process.Build = { (build_id, builds.length) match { case (UUID(_), _) if builds.exists(_.build_uuid == build_id) => builds.find(_.build_uuid == build_id).get case ("", 1) => builds.head case ("", 0) => error(print_builds(build_database, builds)) case _ => cat_error("Cannot identify build process " + quote(build_id), print_builds(build_database, builds)) } } /* "isabelle build_process" */ def build_process( options: Options, build_cluster: Boolean = false, list_builds: Boolean = false, remove_builds: Boolean = false, force: Boolean = false, progress: Progress = new Progress ): Unit = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = build_cluster) using(store.open_server()) { server => using_optional(store.maybe_open_build_database(server = server)) { build_database => def print(builds: List[Build_Process.Build]): Unit = if (list_builds) progress.echo(print_builds(build_database, builds)) build_database match { case None => print(Nil) case Some(db) if remove_builds && force => db.transaction { val tables0 = ML_Heap.private_data.tables.list ::: Store.private_data.tables.list ::: Database_Progress.private_data.tables.list ::: Build_Process.private_data.tables.list val tables = tables0.filter(t => db.exists_table(t.name)).sortBy(_.name) if (tables.nonEmpty) { progress.echo("Removing tables " + commas_quote(tables.map(_.name)) + " ...") db.execute_statement(SQL.MULTI(tables.map(db.destroy))) } } case Some(db) => Build_Process.private_data.transaction_lock(db, create = true, label = "build_process") { val builds = Build_Process.private_data.read_builds(db) print(builds) if (remove_builds) { val remove = builds.flatMap(_.active_build_uuid) if (remove.nonEmpty) { progress.echo("Removing " + commas(remove) + " ...") Build_Process.private_data.remove_builds(db, remove) print(Build_Process.private_data.read_builds(db)) } } } } } } } val isabelle_tool2 = Isabelle_Tool("build_process", "manage session build process", Scala_Project.here, { args => var build_cluster = false var force = false var list_builds = false var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var remove_builds = false val getopts = Getopts(""" Usage: isabelle build_process [OPTIONS] Options are: -C build cluster mode (database server) -f extra force for option -r -l list build processes -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -r remove data from build processes: inactive processes (default) or all processes (option -f) Manage Isabelle build process, notably distributed build cluster (option -C). """, "C" -> (_ => build_cluster = true), "f" -> (_ => force = true), "l" -> (_ => list_builds = true), "o:" -> (arg => options = options + arg), "r" -> (_ => remove_builds = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = new Console_Progress() build_process(options, build_cluster = build_cluster, list_builds = list_builds, remove_builds = remove_builds, force = force, progress = progress) }) /* "isabelle build_worker" */ def build_worker_command( host: Build_Cluster.Host, ssh: SSH.System = SSH.Local, build_options: List[Options.Spec] = Nil, build_id: String = "", isabelle_home: Path = Path.current, afp_root: Option[Path] = None, dirs: List[Path] = Nil, quiet: Boolean = false, verbose: Boolean = false ): String = { val options = build_options ::: Options.Spec.eq("build_hostname", host.name) :: host.options ssh.bash_path(Isabelle_Tool.exe(isabelle_home)) + " build_worker" + if_proper(build_id, " -B " + Bash.string(build_id)) + if_proper(afp_root, " -A " + ssh.bash_path(afp_root.get)) + dirs.map(dir => " -d " + ssh.bash_path(dir)).mkString + if_proper(host.numa, " -N") + " -j" + host.jobs + Options.Spec.bash_strings(options, bg = true) + if_proper(quiet, " -q") + if_proper(verbose, " -v") } def build_worker( options: Options, build_id: String = "", progress: Progress = new Progress, afp_root: Option[Path] = None, dirs: List[Path] = Nil, numa_shuffling: Boolean = false, max_jobs: Option[Int] = None ): Results = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = true) val build_options = store.options using(store.open_server()) { server => using_optional(store.maybe_open_build_database(server = server)) { build_database => val builds = read_builds(build_database) val build_master = find_builds(build_database, build_id, builds.filter(_.active)) val sessions_structure = Sessions.load_structure(build_options, dirs = AFP.main_dirs(afp_root) ::: dirs). selection(Sessions.Selection(sessions = build_master.sessions)) val build_deps = Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors val build_context = Context(store, build_deps, engine = engine, afp_root = afp_root, hostname = hostname(build_options), numa_shuffling = numa_shuffling, build_uuid = build_master.build_uuid, build_start = Some(build_master.start), jobs = max_jobs.getOrElse(1)) engine.run_build_process(build_context, progress, server) } } } val isabelle_tool3 = Isabelle_Tool("build_worker", "start worker for session build process", Scala_Project.here, { args => var afp_root: Option[Path] = None var build_id = "" var numa_shuffling = false val dirs = new mutable.ListBuffer[Path] var max_jobs: Option[Int] = None var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var quiet = false var verbose = false val getopts = Getopts(""" Usage: isabelle build_worker [OPTIONS] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B UUID existing UUID for build process (default: from database) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -d DIR include session directory -j INT maximum number of parallel jobs (default 1) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -q quiet mode: no progress -v verbose """, "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => build_id = arg), "N" -> (_ => numa_shuffling = true), "d:" -> (arg => dirs += Path.explode(arg)), "j:" -> (arg => max_jobs = Some(Value.Nat.parse(arg))), "o:" -> (arg => options = options + arg), "q" -> (_ => quiet = true), "v" -> (_ => verbose = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = if (quiet && verbose) new Progress { override def verbose: Boolean = true } else if (quiet) new Progress else new Console_Progress(verbose = verbose) val results = progress.interrupt_handler { build_worker(options, build_id = build_id, progress = progress, afp_root = afp_root, dirs = dirs.toList, numa_shuffling = Host.numa_check(progress, numa_shuffling), max_jobs = max_jobs) } if (!results.ok) sys.exit(results.rc) }) /** "isabelle build_log" **/ /* theory markup/messages from session database */ def read_theory( theory_context: Export.Theory_Context, unicode_symbols: Boolean = false ): Option[Document.Snapshot] = { def decode_bytes(bytes: Bytes): String = Symbol.output(unicode_symbols, UTF8.decode_permissive(bytes)) def read(name: String): Export.Entry = theory_context(name, permissive = true) def read_xml(name: String): XML.Body = YXML.parse_body(decode_bytes(read(name).bytes), cache = theory_context.cache) def read_source_file(name: String): Store.Source_File = theory_context.session_context.source_file(name) for { id <- theory_context.document_id() (thy_file, blobs_files) <- theory_context.files(permissive = true) } yield { val master_dir = Path.explode(Url.strip_base_name(thy_file).getOrElse( error("Cannot determine theory master directory: " + quote(thy_file)))) val blobs = blobs_files.map { name => val path = Path.explode(name) val src_path = File.relative_path(master_dir, path).getOrElse(path) val file = read_source_file(name) val bytes = file.bytes val text = decode_bytes(bytes) val chunk = Symbol.Text_Chunk(text) Command.Blob(Document.Node.Name(name), src_path, Some((file.digest, chunk))) -> Document.Blobs.Item(bytes, text, chunk, changed = false) } val thy_source = decode_bytes(read_source_file(thy_file).bytes) val thy_xml = read_xml(Export.MARKUP) val blobs_xml = for (i <- (1 to blobs.length).toList) yield read_xml(Export.MARKUP + i) val markups_index = Command.Markup_Index.make(blobs.map(_._1)) val markups = Command.Markups.make( for ((index, xml) <- markups_index.zip(thy_xml :: blobs_xml)) yield index -> Markup_Tree.from_XML(xml)) val results = Command.Results.make( for (case elem@XML.Elem(Markup(_, Markup.Serial(i)), _) <- read_xml(Export.MESSAGES)) yield i -> elem) val command = Command.unparsed(thy_source, theory = true, id = id, node_name = Document.Node.Name(thy_file, theory = theory_context.theory), blobs_info = Command.Blobs_Info.make(blobs), markups = markups, results = results) val doc_blobs = Document.Blobs.make(blobs) Document.State.init.snippet(command, doc_blobs) } } /* print messages */ def print_log( options: Options, sessions: List[String], theories: List[String] = Nil, message_head: List[Regex] = Nil, message_body: List[Regex] = Nil, progress: Progress = new Progress, margin: Double = Pretty.default_margin, breakgain: Double = Pretty.default_breakgain, metric: Pretty.Metric = Symbol.Metric, unicode_symbols: Boolean = false ): Unit = { val store = Store(options) val session = new Session(options, Resources.bootstrap) def check(filter: List[Regex], make_string: => String): Boolean = filter.isEmpty || { val s = Output.clean_yxml(make_string) filter.forall(r => r.findFirstIn(Output.clean_yxml(s)).nonEmpty) } def print(session_name: String): Unit = { using(Export.open_session_context0(store, session_name)) { session_context => val result = for { db <- session_context.session_db() theories = store.read_theories(db, session_name) errors = store.read_errors(db, session_name) info <- store.read_build(db, session_name) } yield (theories, errors, info.return_code) result match { case None => store.error_database(session_name) case Some((used_theories, errors, rc)) => theories.filterNot(used_theories.toSet) match { case Nil => case bad => error("Unknown theories " + commas_quote(bad)) } val print_theories = if (theories.isEmpty) used_theories else used_theories.filter(theories.toSet) for (thy <- print_theories) { val thy_heading = "\nTheory " + quote(thy) + " (in " + session_name + ")" + ":" Build.read_theory(session_context.theory(thy), unicode_symbols = unicode_symbols) match { case None => progress.echo(thy_heading + " MISSING") case Some(snapshot) => val rendering = new Rendering(snapshot, options, session) val messages = rendering.text_messages(Text.Range.full) .filter(message => progress.verbose || Protocol.is_exported(message.info)) if (messages.nonEmpty) { val line_document = Line.Document(snapshot.node.source) val buffer = new mutable.ListBuffer[String] for (Text.Info(range, elem) <- messages) { val line = line_document.position(range.start).line1 val pos = Position.Line_File(line, snapshot.node_name.node) def message_text: String = Protocol.message_text(elem, heading = true, pos = pos, margin = margin, breakgain = breakgain, metric = metric) val ok = check(message_head, Protocol.message_heading(elem, pos)) && check(message_body, XML.content(Pretty.unformatted(List(elem)))) if (ok) buffer += message_text } if (buffer.nonEmpty) { progress.echo(thy_heading) buffer.foreach(progress.echo(_)) } } } } if (errors.nonEmpty) { val msg = Symbol.output(unicode_symbols, cat_lines(errors)) progress.echo("\nBuild errors:\n" + Output.error_message_text(msg)) } if (rc != Process_Result.RC.ok) { progress.echo("\n" + Process_Result.RC.print_long(rc)) } } } } val errors = new mutable.ListBuffer[String] for (session_name <- sessions) { Exn.result(print(session_name)) match { case Exn.Res(_) => case Exn.Exn(exn) => errors += Exn.message(exn) } } if (errors.nonEmpty) error(cat_lines(errors.toList)) } /* Isabelle tool wrapper */ val isabelle_tool4 = Isabelle_Tool("build_log", "print messages from session build database", Scala_Project.here, { args => /* arguments */ val message_head = new mutable.ListBuffer[Regex] val message_body = new mutable.ListBuffer[Regex] var unicode_symbols = false val theories = new mutable.ListBuffer[String] var margin = Pretty.default_margin var options = Options.init() var verbose = false val getopts = Getopts(""" Usage: isabelle build_log [OPTIONS] [SESSIONS ...] Options are: -H REGEX filter messages by matching against head -M REGEX filter messages by matching against body -T NAME restrict to given theories (multiple options possible) -U output Unicode symbols -m MARGIN margin for pretty printing (default: """ + margin + """) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v print all messages, including information etc. Print messages from the session build database of the given sessions, without any checks against current sources nor session structure: results from old sessions or failed builds can be printed as well. Multiple options -H and -M are conjunctive: all given patterns need to match. Patterns match any substring, but ^ or $ may be used to match the start or end explicitly. """, "H:" -> (arg => message_head += arg.r), "M:" -> (arg => message_body += arg.r), "T:" -> (arg => theories += arg), "U" -> (_ => unicode_symbols = true), "m:" -> (arg => margin = Value.Double.parse(arg)), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) if (sessions.isEmpty) progress.echo_warning("No sessions to print") else { print_log(options, sessions, theories = theories.toList, message_head = message_head.toList, message_body = message_body.toList, margin = margin, progress = progress, unicode_symbols = unicode_symbols) } }) } diff --git a/src/Pure/Build/build_benchmark.scala b/src/Pure/Build/build_benchmark.scala --- a/src/Pure/Build/build_benchmark.scala +++ b/src/Pure/Build/build_benchmark.scala @@ -1,169 +1,159 @@ /* Title: Pure/Build/build_benchmark.scala Author: Fabian Huch, TU Muenchen Host platform benchmarks for performance estimation. */ package isabelle import scala.collection.mutable object Build_Benchmark { /* benchmark */ def benchmark_session(options: Options) = options.string("build_benchmark_session") def benchmark_command( options: Options, host: Build_Cluster.Host, ssh: SSH.System = SSH.Local, isabelle_home: Path = Path.current, ): String = { val benchmark_options = List( Options.Spec.eq("build_hostname", host.name), Options.Spec("build_database_server"), options.spec("build_benchmark_session")) ssh.bash_path(Isabelle_Tool.exe(isabelle_home)) + " build_benchmark" + Options.Spec.bash_strings(benchmark_options ::: host.options, bg = true) } def benchmark_requirements(options: Options, progress: Progress = new Progress): Unit = { val options1 = options.string.update("build_engine", Build.Engine.Default.name) val selection = Sessions.Selection(requirements = true, sessions = List(benchmark_session(options))) val res = Build.build(options1, selection = selection, progress = progress, build_heap = true) if (!res.ok) error("Failed building requirements") } def run_benchmark(options: Options, progress: Progress = new Progress): Unit = { val hostname = options.string("build_hostname") val store = Store(options) using(store.open_server()) { server => using_optional(store.maybe_open_database_server(server = server)) { database_server => val db = store.open_build_database(path = Host.private_data.database, server = server) progress.echo("Starting benchmark ...") val benchmark_session_name = benchmark_session(options) val selection = Sessions.Selection(sessions = List(benchmark_session_name)) val full_sessions = Sessions.load_structure(options + "threads=1") val build_deps = Sessions.deps(full_sessions.selection(selection)).check_errors val build_context = Build.Context(store, build_deps, jobs = 1) val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress) val session = sessions(benchmark_session_name) val hierachy = session.ancestors.map(store.output_session(_, store_heap = true)) for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress) val local_options = options + "build_database_server=false" + "build_database=false" benchmark_requirements(local_options, progress) for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress) - def get_shasum(session_name: String): SHA1.Shasum = { - val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum) - - val input_shasum = - if (ancestor_shasums.isEmpty) ML_Process.bootstrap_shasum() - else SHA1.flat_shasum(ancestor_shasums) - - store.check_output( - database_server, session_name, - session_options = build_context.sessions_structure(session_name).options, - sources_shasum = sessions(session_name).sources_shasum, - input_shasum = input_shasum, - fresh_build = false, - store_heap = false)._2 - } + def get_shasum(name: String): SHA1.Shasum = + store.check_output(database_server, name, + session_options = build_context.sessions_structure(name).options, + sources_shasum = sessions(name).sources_shasum, + input_shasum = ML_Process.make_shasum(sessions(name).ancestors.map(get_shasum)))._2 val deps = Sessions.deps(full_sessions.selection(selection)).check_errors val background = deps.background(benchmark_session_name) val input_shasum = get_shasum(benchmark_session_name) val node_info = Host.Node_Info(hostname, None, Nil) val local_build_context = build_context.copy(store = Store(local_options)) val result = Build_Job.start_session(local_build_context, session, progress, new Logger, server, background, session.sources_shasum, input_shasum, node_info, false).join val timing = if (result.process_result.ok) result.process_result.timing else error("Failed to build benchmark session") val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms) progress.echo( "Finished benchmark in " + timing.message + ". Score: " + String.format("%.2f", score)) Host.write_info(db, Host.Info.init(hostname = hostname, score = Some(score))) } } } def benchmark( options: Options, build_hosts: List[Build_Cluster.Host] = Nil, progress: Progress = new Progress ): Unit = if (build_hosts.isEmpty) run_benchmark(options, progress) else { val engine = Build.Engine.Default val store = engine.build_store(options, build_cluster = true) benchmark_requirements(store.options, progress) val deps0 = Sessions.deps(Sessions.load_structure(options)) val build_context = Build.Context(store, deps0, build_hosts = build_hosts) val build_cluster = Build_Cluster.make(build_context, progress).open().init().benchmark() if (!build_cluster.ok) error("Benchmarking failed") build_cluster.stop() using(store.open_server()) { server => val db = store.open_build_database(path = Host.private_data.database, server = server) for (build_host <- build_hosts) { val score = (for { info <- Host.read_info(db, build_host.name) score <- info.benchmark_score } yield score).getOrElse(error("No score for host " + quote(build_host.name))) progress.echo(build_host.name + ": " + score) } } } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("build_benchmark", "run benchmark for build process", Scala_Project.here, { args => val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] var options = Options.init() val getopts = Getopts(""" Usage: isabelle build_benchmark [OPTIONS] Options are: -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) Run benchmark for build process. """, "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)), "o:" -> (arg => options = options + arg)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = new Console_Progress() benchmark(options, build_hosts = build_hosts.toList, progress = progress) }) } \ No newline at end of file diff --git a/src/Pure/Build/build_job.scala b/src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala +++ b/src/Pure/Build/build_job.scala @@ -1,561 +1,555 @@ /* Title: Pure/Build/build_job.scala Author: Makarius Build job running prover process, with rudimentary PIDE session. */ package isabelle import scala.collection.mutable trait Build_Job { def cancel(): Unit = () def is_finished: Boolean = false def join: Build_Job.Result = Build_Job.no_result } object Build_Job { sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum) val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum) /* build session */ def start_session( build_context: Build.Context, session_context: Session_Context, progress: Progress, log: Logger, server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, node_info: Host.Node_Info, store_heap: Boolean ): Session_Job = { new Session_Job(build_context, session_context, progress, log, server, session_background, sources_shasum, input_shasum, node_info, store_heap) } object Session_Context { def load( database_server: Option[SQL.Database], build_uuid: String, name: String, deps: List[String], ancestors: List[String], session_prefs: String, sources_shasum: SHA1.Shasum, timeout: Time, store: Store, progress: Progress = new Progress ): Session_Context = { def default: Session_Context = Session_Context( name, deps, ancestors, session_prefs, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid) def read(db: SQL.Database): Session_Context = { def ignore_error(msg: String) = { progress.echo_warning( "Ignoring bad database " + db + " for session " + quote(name) + if_proper(msg, ":\n" + msg)) default } try { val command_timings = store.read_command_timings(db, name) val elapsed = store.read_session_timing(db, name) match { case Markup.Elapsed(s) => Time.seconds(s) case _ => Time.zero } new Session_Context( name, deps, ancestors, session_prefs, sources_shasum, timeout, elapsed, command_timings, build_uuid) } catch { case ERROR(msg) => ignore_error(msg) case exn: java.lang.Error => ignore_error(Exn.message(exn)) case _: XML.Error => ignore_error("XML.Error") } } database_server match { case Some(db) => if (store.session_info_exists(db)) read(db) else default case None => using_option(store.try_open_database(name))(read) getOrElse default } } } sealed case class Session_Context( name: String, deps: List[String], ancestors: List[String], session_prefs: String, sources_shasum: SHA1.Shasum, timeout: Time, old_time: Time, old_command_timings_blob: Bytes, build_uuid: String ) extends Library.Named class Session_Job private[Build_Job]( build_context: Build.Context, session_context: Session_Context, progress: Progress, log: Logger, server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, node_info: Host.Node_Info, store_heap: Boolean ) extends Build_Job { def session_name: String = session_background.session_name private val future_result: Future[Result] = Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = Host.node_options(info.options, node_info) val store = build_context.store using_optional(store.maybe_open_database_server(server = server)) { database_server => store.clean_output(database_server, session_name, session_init = true) val session_sources = Store.Sources.load(session_background.base, cache = store.cache.compress) val env = Isabelle_System.settings( List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) val session_heaps = session_background.info.parent match { case None => Nil case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic) } val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil val eval_store = if (store_heap) { (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: List("ML_Heap.save_child " + ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) } else Nil def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = session_background.base.theory_load_commands.get(node_name.theory) match { case None => Nil case Some(spans) => val syntax = session_background.base.theory_syntax(node_name) val master_dir = Path.explode(node_name.master_dir) for (span <- spans; file <- span.loaded_files(syntax).files) yield { val src_path = Path.explode(file) val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) val bytes = session_sources(blob_name.node).bytes val text = bytes.text val chunk = Symbol.Text_Chunk(text) Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> Document.Blobs.Item(bytes, text, chunk, changed = false) } } /* session */ val resources = new Resources(session_background, log = log, command_timings = Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)) val session = new Session(options, resources) { override val cache: Term.Cache = store.cache override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = Command.Blobs_Info.make(session_blobs(node_name)) override def build_blobs(node_name: Document.Node.Name): Document.Blobs = Document.Blobs.make(session_blobs(node_name)) } object Build_Session_Errors { private val promise: Promise[List[String]] = Future.promise def result: Exn.Result[List[String]] = promise.join_result def cancel(): Unit = promise.cancel() def apply(errs: List[String]): Unit = { try { promise.fulfill(errs) } catch { case _: IllegalStateException => } } } val export_consumer = Export.consumer(store.open_database(session_name, output = true, server = server), store.cache, progress = progress) val stdout = new StringBuilder(1000) val stderr = new StringBuilder(1000) val command_timings = new mutable.ListBuffer[Properties.T] val theory_timings = new mutable.ListBuffer[Properties.T] val session_timings = new mutable.ListBuffer[Properties.T] val runtime_statistics = new mutable.ListBuffer[Properties.T] val task_statistics = new mutable.ListBuffer[Properties.T] def fun( name: String, acc: mutable.ListBuffer[Properties.T], unapply: Properties.T => Option[Properties.T] ): (String, Session.Protocol_Function) = { name -> ((msg: Prover.Protocol_Output) => unapply(msg.properties) match { case Some(props) => acc += props; true case _ => false }) } session.init_protocol_handler(new Session.Protocol_Handler { override def exit(): Unit = Build_Session_Errors.cancel() private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { val (rc, errors) = try { val (rc, errs) = { import XML.Decode._ pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) } val errors = for (err <- errs) yield { val prt = Protocol_Message.expose_no_reports(err) Pretty.string_of(prt, metric = Symbol.Metric) } (rc, errors) } catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } session.protocol_command("Prover.stop", rc.toString) Build_Session_Errors(errors) true } private def loading_theory(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Markup.Loading_Theory(Markup.Name(name)) => progress.theory(Progress.Theory(name, session = session_name)) false case _ => false } private def export_(msg: Prover.Protocol_Output): Boolean = msg.properties match { case Protocol.Export(args) => export_consumer.make_entry(session_name, args, msg.chunk) true case _ => false } override val functions: Session.Protocol_Functions = List( Markup.Build_Session_Finished.name -> build_session_finished, Markup.Loading_Theory.name -> loading_theory, Markup.EXPORT -> export_, fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) }) session.command_timings += Session.Consumer("command_timings") { case Session.Command_Timing(props) => for { elapsed <- Markup.Elapsed.unapply(props) elapsed_time = Time.seconds(elapsed) if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") } command_timings += props.filter(Markup.command_timing_property) } session.runtime_statistics += Session.Consumer("ML_statistics") { case Session.Runtime_Statistics(props) => runtime_statistics += props } session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { case snapshot => if (!progress.stopped) { def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { if (!progress.stopped) { val theory_name = snapshot.node_name.theory val args = Protocol.Export.Args( theory_name = theory_name, name = name, compress = compress) val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) export_consumer.make_entry(session_name, args, body) } } def export_text(name: String, text: String, compress: Boolean = true): Unit = export_(name, List(XML.Text(text)), compress = compress) for (command <- snapshot.snippet_command) { export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) } export_text(Export.FILES, cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), compress = false) for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { val xml = snapshot.switch(blob_name).xml_markup() export_(Export.MARKUP + (i + 1), xml) } export_(Export.MARKUP, snapshot.xml_markup()) export_(Export.MESSAGES, snapshot.messages.map(_._1)) } } session.all_messages += Session.Consumer[Any]("build_session_output") { case msg: Prover.Output => val message = msg.message if (msg.is_system) resources.log(Protocol.message_text(message)) if (msg.is_stdout) { stdout ++= Symbol.encode(XML.content(message)) } else if (msg.is_stderr) { stderr ++= Symbol.encode(XML.content(message)) } else if (msg.is_exit) { val err = "Prover terminated" + (msg.properties match { case Markup.Process_Result(result) => ": " + result.print_rc case _ => "" }) Build_Session_Errors(List(err)) } case _ => } build_context.session_setup(session_name, session) val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) /* process */ val process = Isabelle_Process.start(options, session, session_background, session_heaps, use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) val timeout_request: Option[Event_Timer.Request] = if (info.timeout_ignored) None else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() }) val build_errors = Isabelle_Thread.interrupt_handler(_ => process.terminate()) { Exn.capture { process.await_startup() } match { case Exn.Res(_) => val resources_yxml = resources.init_session_yxml val encode_options: XML.Encode.T[Options] = options => session.prover_options(options).encode val args_yxml = YXML.string_of_body( { import XML.Encode._ pair(string, list(pair(encode_options, list(pair(string, properties)))))( (session_name, info.theories)) }) session.protocol_command("build_session", resources_yxml, args_yxml) Build_Session_Errors.result case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) } } val result0 = Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } val was_timeout = timeout_request match { case None => false case Some(request) => !request.cancel() } session.stop() val export_errors = export_consumer.shutdown(close = true).map(Output.error_message_text) val (document_output, document_errors) = try { if (Exn.is_res(build_errors) && result0.ok && info.documents.nonEmpty) { using(Export.open_database_context(store, server = server)) { database_context => val documents = using(database_context.open_session(session_background)) { session_context => Document_Build.build_documents( Document_Build.context(session_context, progress = progress), output_sources = info.document_output, output_pdf = info.document_output) } using(database_context.open_database(session_name, output = true))( session_database => documents.foreach(_.write(session_database.db, session_name))) (documents.flatMap(_.log_lines), Nil) } } else (Nil, Nil) } catch { case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) } /* process result */ val result1 = { val theory_timing = theory_timings.iterator.flatMap( { case props @ Markup.Name(name) => Some(name -> props) case _ => None }).toMap val used_theory_timings = for { (name, _) <- session_background.base.used_theories } yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) val more_output = Library.trim_line(stdout.toString) :: command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: document_output result0.output(more_output) .error(Library.trim_line(stderr.toString)) .errors_rc(export_errors ::: document_errors) } val result2 = build_errors match { case Exn.Res(build_errs) => val errs = build_errs ::: document_errors if (errs.nonEmpty) { result1.error_rc.output( errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: errs.map(Protocol.Error_Message_Marker.apply)) } else if (progress.stopped && result1.ok) { result1.copy(rc = Process_Result.RC.interrupt) } else result1 case Exn.Exn(Exn.Interrupt()) => if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) else result1 case Exn.Exn(exn) => throw exn } val process_result = if (result2.ok) result2 else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) else result2 val store_session = store.output_session(session_name, store_heap = process_result.ok && store_heap) /* output heap */ val output_shasum = store_session.heap match { case Some(path) => SHA1.shasum(ML_Heap.write_file_digest(path), session_name) case None => SHA1.no_shasum } val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) val build_log = Build_Log.Log_File(session_name, process_result.out_lines, cache = store.cache). parse_session_info( command_timings = true, theory_timings = true, ml_statistics = true, task_statistics = true) // write log file if (process_result.ok) { File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) } else File.write(store.output_log(session_name), terminate_lines(log_lines)) // write database def write_info(db: SQL.Database): Unit = store.write_session_info(db, session_name, session_sources, build_log = if (process_result.timeout) build_log.error("Timeout") else build_log, build = Store.Build_Info( sources = sources_shasum, input_heaps = input_shasum, output_heap = output_shasum, process_result.rc, build_context.build_uuid)) database_server match { case Some(db) => write_info(db) case None => using(store.open_database(session_name, output = true))(write_info) } - using_optional(store.maybe_open_heaps_database(database_server, server = server)) { - heaps_database => - for (db <- database_server orElse heaps_database) { - val slice = Space.MiB(options.real("build_database_slice")) - ML_Heap.store(db, store_session, slice, - cache = store.cache.compress, progress = progress) - } - } + store.in_heaps_database( + List(store_session), database_server, server = server, progress = progress) // messages process_result.err_lines.foreach(progress.echo(_)) if (process_result.ok) { val props = build_log.session_timing val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 val timing = Markup.Timing_Properties.get(props) progress.echo( "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")", verbose = true) progress.echo( "Finished " + session_name + " (" + process_result.timing.message_resources + ")") } else { progress.echo( session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") if (!process_result.interrupted) { val tail = info.options.int("process_output_tail") val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) val prefix = if (log_lines.length == suffix.length) Nil else List("...") progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) } } Result(process_result.copy(out_lines = log_lines), output_shasum) } } override def cancel(): Unit = future_result.cancel() override def is_finished: Boolean = future_result.is_finished override def join: Result = future_result.join } } diff --git a/src/Pure/Build/build_process.scala b/src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala +++ b/src/Pure/Build/build_process.scala @@ -1,1417 +1,1409 @@ /* Title: Pure/Build/build_process.scala Author: Makarius Build process for sessions, with build database, optional heap, and optional presentation. */ package isabelle import scala.collection.immutable.SortedMap import scala.math.Ordering import scala.annotation.tailrec object Build_Process { /** state vs. database **/ sealed case class Build( build_uuid: String, // Database_Progress.context_uuid build_id: Long, ml_platform: String, options: String, start: Date, stop: Option[Date], sessions: List[String] ) { def active: Boolean = stop.isEmpty def active_build_uuid: Option[String] = if (active) Some(build_uuid) else None def print: String = build_uuid + " (platform: " + ml_platform + ", start: " + Build_Log.print_date(start) + if_proper(stop, ", stop: " + Build_Log.print_date(stop.get)) + ")" } sealed case class Worker( worker_uuid: String, // Database_Progress.agent_uuid build_uuid: String, start: Date, stamp: Date, stop: Option[Date], serial: Long ) object Task { type Entry = (String, Task) def entry(session: Build_Job.Session_Context, build_context: isabelle.Build.Context): Entry = session.name -> Task(session.name, session.deps, build_context.build_uuid) } sealed case class Task( name: String, deps: List[String], build_uuid: String ) extends Library.Named { def is_ready: Boolean = deps.isEmpty def resolve(dep: String): Option[Task] = if (deps.contains(dep)) Some(copy(deps = deps.filterNot(_ == dep))) else None } sealed case class Job( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, start_date: Date, build: Option[Build_Job] ) extends Library.Named sealed case class Result( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, start_date: Date, process_result: Process_Result, output_shasum: SHA1.Shasum, current: Boolean ) extends Library.Named { def ok: Boolean = process_result.ok } object Sessions { type Graph = isabelle.Graph[String, Build_Job.Session_Context] val empty: Sessions = new Sessions(Graph.string) } final class Sessions private(val graph: Sessions.Graph) { override def toString: String = graph.toString def defined(name: String): Boolean = graph.defined(name) def apply(name: String): Build_Job.Session_Context = graph.get_node(name) def iterator: Iterator[Build_Job.Session_Context] = for (name <- graph.topological_order.iterator) yield apply(name) + def store_heap(name: String): Boolean = + isabelle.Sessions.is_pure(name) || iterator.exists(_.ancestors.contains(name)) + def data: Library.Update.Data[Build_Job.Session_Context] = Map.from(for ((_, (session, _)) <- graph.iterator) yield session.name -> session) def make(new_graph: Sessions.Graph): Sessions = if (graph == new_graph) this else { new Sessions( new_graph.iterator.foldLeft(new_graph) { case (g, (name, (session, _))) => g.add_deps_acyclic(name, session.deps) }) } def update(updates: List[Library.Update.Op[Build_Job.Session_Context]]): Sessions = { val graph1 = updates.foldLeft(graph) { case (g, Library.Update.Delete(name)) => g.del_node(name) case (g, Library.Update.Insert(session)) => (if (g.defined(session.name)) g.del_node(session.name) else g) .new_node(session.name, session) } make(graph1) } def init( build_context: isabelle.Build.Context, database_server: Option[SQL.Database], progress: Progress = new Progress ): Sessions = { val sessions_structure = build_context.sessions_structure make( sessions_structure.build_graph.iterator.foldLeft(graph) { case (graph0, (name, (info, _))) => val deps = info.parent.toList val prefs = info.session_prefs val ancestors = sessions_structure.build_requirements(deps) val sources_shasum = build_context.deps.sources_shasum(name) if (graph0.defined(name)) { val session0 = graph0.get_node(name) val prefs0 = session0.session_prefs val ancestors0 = session0.ancestors val sources_shasum0 = session0.sources_shasum def err(msg: String, a: String, b: String): Nothing = error("Conflicting dependencies for session " + quote(name) + ": " + msg + "\n" + Library.indent_lines(2, a) + "\nvs.\n" + Library.indent_lines(2, b)) if (prefs0 != prefs) { err("preferences disagree", Symbol.cartouche_decoded(prefs0), Symbol.cartouche_decoded(prefs)) } if (ancestors0 != ancestors) { err("ancestors disagree", commas_quote(ancestors0), commas_quote(ancestors)) } if (sources_shasum0 != sources_shasum) { val a = sources_shasum0 - sources_shasum val b = sources_shasum - sources_shasum0 err("sources disagree", Library.trim_line(a.toString), Library.trim_line(b.toString)) } graph0 } else { val session = Build_Job.Session_Context.load(database_server, build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum, info.timeout, build_context.store, progress = progress) graph0.new_node(name, session) } } ) } lazy val max_time: Map[String, Double] = { val maximals = graph.maximals.toSet def descendants_time(name: String): Double = { if (maximals.contains(name)) apply(name).old_time.seconds else { val descendants = graph.all_succs(List(name)).toSet val g = graph.restrict(descendants) (0.0 :: g.maximals.flatMap { desc => val ps = g.all_preds(List(desc)) if (ps.exists(p => !graph.defined(p))) None else Some(ps.map(p => apply(p).old_time.seconds).sum) }).max } } Map.from( for (name <- graph.keys_iterator) yield name -> descendants_time(name)).withDefaultValue(0.0) } lazy val ordering: Ordering[String] = (a: String, b: String) => max_time(b) compare max_time(a) match { case 0 => apply(b).timeout compare apply(a).timeout match { case 0 => a compare b case ord => ord } case ord => ord } } sealed case class Snapshot( builds: List[Build], // available build configurations workers: List[Worker], // available worker processes sessions: Sessions, // static build targets pending: State.Pending, // dynamic build "queue" running: State.Running, // presently running jobs results: State.Results) // finished results object State { def inc_serial(serial: Long): Long = { require(serial < Long.MaxValue, "serial overflow") serial + 1 } type Pending = Library.Update.Data[Task] type Running = Library.Update.Data[Job] type Results = Library.Update.Data[Result] } sealed case class State( serial: Long = 0, - numa_nodes: List[Int] = Nil, sessions: Sessions = Sessions.empty, pending: State.Pending = Map.empty, running: State.Running = Map.empty, results: State.Results = Map.empty ) { def next_serial: Long = State.inc_serial(serial) def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name) def next_ready: List[Task] = ready.filter(task => !is_running(task.name)) def exists_ready: Boolean = pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name)) def remove_pending(a: String): State = copy(pending = pending.foldLeft(pending) { case (map, (b, task)) => if (a == b) map - a else { task.resolve(a) match { case None => map case Some(task1) => map + (b -> task1) } } }) def is_running(name: String): Boolean = running.isDefinedAt(name) def build_running: List[Build_Job] = running.valuesIterator.flatMap(_.build).toList def finished_running(): Boolean = build_running.exists(_.is_finished) def busy_running(jobs: Int): Boolean = jobs <= 0 || jobs <= build_running.length def add_running(job: Job): State = copy(running = running + (job.name -> job)) def remove_running(name: String): State = copy(running = running - name) def make_result( result_name: (String, String, String), process_result: Process_Result, output_shasum: SHA1.Shasum, start_date: Date, node_info: Host.Node_Info = Host.Node_Info.none, current: Boolean = false ): State = { val (name, worker_uuid, build_uuid) = result_name val result = Result(name, worker_uuid, build_uuid, node_info, start_date, process_result, output_shasum, current) copy(results = results + (name -> result)) } def ancestor_results(name: String): Option[List[Result]] = { val defined = sessions.defined(name) && sessions(name).ancestors.forall(a => sessions.defined(a) && results.isDefinedAt(a)) if (defined) Some(sessions(name).ancestors.map(results)) else None } } /** SQL data model **/ object private_data extends SQL.Data("isabelle_build") { val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db") /* tables */ override lazy val tables: SQL.Tables = SQL.Tables( Updates.table, Sessions.table, Pending.table, Running.table, Results.table, Base.table, Workers.table) private lazy val build_uuid_tables = tables.filter(Generic.build_uuid_table) private lazy val build_id_tables = tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t)) /* notifications */ lazy val channel: String = Base.table.name lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready") /* generic columns */ object Generic { val build_id = SQL.Column.long("build_id") val build_uuid = SQL.Column.string("build_uuid") val worker_uuid = SQL.Column.string("worker_uuid") val name = SQL.Column.string("name") def build_id_table(table: SQL.Table): Boolean = table.columns.exists(_.equals_name(build_id)) def build_uuid_table(table: SQL.Table): Boolean = table.columns.exists(_.equals_name(build_uuid)) def sql( build_id: Long = 0, build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = SQL.and( if_proper(build_id > 0, Generic.build_id.equal(build_id)), if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)), if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)), if_proper(names, Generic.name.member(names))) def sql_where( build_id: Long = 0, build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = { SQL.where(sql( build_id = build_id, build_uuid = build_uuid, worker_uuid = worker_uuid, names = names)) } } /* recorded updates */ object Updates { val build_id = Generic.build_id.make_primary_key val serial = SQL.Column.long("serial").make_primary_key val kind = SQL.Column.int("kind").make_primary_key val name = Generic.name.make_primary_key val table = make_table(List(build_id, serial, kind, name), name = "updates") // virtual columns for JOIN with data val delete = SQL.Column.bool("delete").make_expr(name.undefined) val dom = SQL.Column.string("dom") val dom_name = dom.make_expr(name.ident) val name_dom = name.make_expr(dom.ident) } def read_updates[A]( db: SQL.Database, table: SQL.Table, build_id: Long, serial_seen: Long, get: SQL.Result => A ): List[Library.Update.Op[A]] = { val domain_columns = List(Updates.dom_name) val domain_table = SQL.Table("domain", domain_columns, body = Updates.table.select(domain_columns, distinct = true, sql = SQL.where_and( Updates.build_id.equal(build_id), Updates.serial.ident + " > " + serial_seen, Updates.kind.equal(tables.index(table))))) val select_columns = Updates.delete :: Updates.name_dom :: table.columns.filterNot(_.equals_name(Generic.name)) val select_sql = SQL.select(select_columns, sql = domain_table.query_named + SQL.join_outer + table + " ON " + Updates.dom + " = " + Generic.name) db.execute_query_statement(select_sql, List.from[Library.Update.Op[A]], res => if (res.bool(Updates.delete)) Library.Update.Delete(res.string(Updates.name)) else Library.Update.Insert(get(res))) } def write_updates( db: SQL.Database, build_id: Long, serial: Long, updates: List[Library.Update] ): Unit = db.execute_batch_statement(db.insert_permissive(Updates.table), batch = for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator) yield { (stmt: SQL.Statement) => require(build_id > 0 && serial > 0 && kind > 0 && name.nonEmpty, "Bad database update: build_id = " + build_id + ", serial = " + serial + ", kind = " + kind + ", name = " + quote(name)) stmt.long(1) = build_id stmt.long(2) = serial stmt.int(3) = kind stmt.string(4) = name }) /* base table */ object Base { val build_uuid = Generic.build_uuid.make_primary_key val build_id = Generic.build_id.make_primary_key val ml_platform = SQL.Column.string("ml_platform") val options = SQL.Column.string("options") val start = SQL.Column.date("start") val stop = SQL.Column.date("stop") val table = make_table(List(build_uuid, build_id, ml_platform, options, start, stop)) } def read_build_ids(db: SQL.Database, build_uuids: List[String]): List[Long] = db.execute_query_statement( Base.table.select(List(Base.build_id), sql = if_proper(build_uuids, Base.build_uuid.where_member(build_uuids))), List.from[Long], res => res.long(Base.build_id)) def get_build_id(db: SQL.Database, build_uuid: String): Long = { read_build_ids(db, build_uuids = List(build_uuid)) match { case build_id :: _ => build_id case _ => db.execute_query_statementO( Base.table.select(List(Base.build_id.max)), _.long(Base.build_id)).getOrElse(0L) + 1L } } def read_builds(db: SQL.Database): List[Build] = { val builds = db.execute_query_statement(Base.table.select(), List.from[Build], { res => val build_uuid = res.string(Base.build_uuid) val build_id = res.long(Base.build_id) val ml_platform = res.string(Base.ml_platform) val options = res.string(Base.options) val start = res.date(Base.start) val stop = res.get_date(Base.stop) Build(build_uuid, build_id, ml_platform, options, start, stop, Nil) }) for (build <- builds.sortBy(_.start)(Date.Ordering)) yield { build.copy(sessions = private_data.read_sessions(db, build_uuid = build.build_uuid).sorted) } } def remove_builds(db: SQL.Database, build_uuids: List[String]): Unit = if (build_uuids.nonEmpty) { val build_ids = read_build_ids(db, build_uuids = build_uuids) val sql1 = Generic.build_uuid.where_member(build_uuids) val sql2 = Generic.build_id.where_member_long(build_ids) db.execute_statement( SQL.MULTI( build_uuid_tables.map(_.delete(sql = sql1)) ++ build_id_tables.map(_.delete(sql = sql2)))) } def start_build( db: SQL.Database, build_id: Long, build_uuid: String, ml_platform: String, options: String, start: Date ): Unit = { db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = build_uuid stmt.long(2) = build_id stmt.string(3) = ml_platform stmt.string(4) = options stmt.date(5) = start stmt.date(6) = None }) } def stop_build(db: SQL.Database, build_uuid: String): Unit = db.execute_statement( Base.table.update(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), body = { stmt => stmt.date(1) = db.now() }) def clean_build(db: SQL.Database): Unit = { val remove = db.execute_query_statement( Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.defined)), List.from[String], res => res.string(Base.build_uuid)) remove_builds(db, remove) } /* sessions */ object Sessions { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val ancestors = SQL.Column.string("ancestors") val options = SQL.Column.string("options") val sources = SQL.Column.string("sources") val timeout = SQL.Column.long("timeout") val old_time = SQL.Column.long("old_time") val old_command_timings = SQL.Column.bytes("old_command_timings") val build_uuid = Generic.build_uuid val table = make_table( List(name, deps, ancestors, options, sources, timeout, old_time, old_command_timings, build_uuid), name = "sessions") lazy val table_index: Int = tables.index(table) } def read_sessions(db: SQL.Database, build_uuid: String = ""): List[String] = db.execute_query_statement( Sessions.table.select(List(Sessions.name), sql = if_proper(build_uuid, Sessions.build_uuid.where_equal(build_uuid))), List.from[String], res => res.string(Sessions.name)) def pull_sessions(db: SQL.Database, build_id: Long, state: State): Sessions = state.sessions.update( read_updates(db, Sessions.table, build_id, state.serial, { res => val name = res.string(Sessions.name) val deps = split_lines(res.string(Sessions.deps)) val ancestors = split_lines(res.string(Sessions.ancestors)) val options = res.string(Sessions.options) val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) val timeout = Time.ms(res.long(Sessions.timeout)) val old_time = Time.ms(res.long(Sessions.old_time)) val old_command_timings_blob = res.bytes(Sessions.old_command_timings) val build_uuid = res.string(Sessions.build_uuid) Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, timeout, old_time, old_command_timings_blob, build_uuid) }) ) def update_sessions( db: SQL.Database, sessions: Build_Process.Sessions, old_sessions: Build_Process.Sessions ): Library.Update = { val update = if (old_sessions.eq(sessions)) Library.Update.empty else Library.Update.make(old_sessions.data, sessions.data, kind = Sessions.table_index) if (update.deletes) { db.execute_statement( Sessions.table.delete(sql = Generic.sql_where(names = update.delete))) } if (update.inserts) { db.execute_batch_statement(Sessions.table.insert(), batch = for (name <- update.insert) yield { (stmt: SQL.Statement) => val session = sessions(name) stmt.string(1) = session.name stmt.string(2) = cat_lines(session.deps) stmt.string(3) = cat_lines(session.ancestors) stmt.string(4) = session.session_prefs stmt.string(5) = session.sources_shasum.toString stmt.long(6) = session.timeout.ms stmt.long(7) = session.old_time.ms stmt.bytes(8) = session.old_command_timings_blob stmt.string(9) = session.build_uuid }) } update } /* workers */ object Workers { val worker_uuid = Generic.worker_uuid.make_primary_key val build_uuid = Generic.build_uuid val start = SQL.Column.date("start") val stamp = SQL.Column.date("stamp") val stop = SQL.Column.date("stop") val serial = SQL.Column.long("serial") val table = make_table(List(worker_uuid, build_uuid, start, stamp, stop, serial), name = "workers") lazy val table_index: Int = tables.index(table) } def read_serial(db: SQL.Database): Long = db.execute_query_statementO[Long]( Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial)).getOrElse(0L) def read_workers( db: SQL.Database, build_uuid: String = "", worker_uuid: String = "" ): List[Worker] = { db.execute_query_statement( Workers.table.select( sql = Generic.sql_where(build_uuid = build_uuid, worker_uuid = worker_uuid)), List.from[Worker], { res => Worker( worker_uuid = res.string(Workers.worker_uuid), build_uuid = res.string(Workers.build_uuid), start = res.date(Workers.start), stamp = res.date(Workers.stamp), stop = res.get_date(Workers.stop), serial = res.long(Workers.serial)) }) } def start_worker( db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long ): Unit = { def err(msg: String): Nothing = error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) val build_stop = db.execute_query_statementO( Base.table.select(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), res => res.get_date(Base.stop)) build_stop match { case Some(None) => case Some(Some(_)) => err("for already stopped build process " + build_uuid) case None => err("for unknown build process " + build_uuid) } db.execute_statement(Workers.table.insert(), body = { stmt => val now = db.now() stmt.string(1) = worker_uuid stmt.string(2) = build_uuid stmt.date(3) = now stmt.date(4) = now stmt.date(5) = None stmt.long(6) = serial }) } def stamp_worker( db: SQL.Database, worker_uuid: String, serial: Long, stop_now: Boolean = false ): Unit = { val sql = Workers.worker_uuid.where_equal(worker_uuid) val stop = db.execute_query_statementO( Workers.table.select(List(Workers.stop), sql = sql), _.get_date(Workers.stop)).flatten db.execute_statement( Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial), sql = sql), body = { stmt => val now = db.now() stmt.date(1) = now stmt.date(2) = if (stop_now) Some(now) else stop stmt.long(3) = serial }) } /* pending jobs */ object Pending { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val build_uuid = Generic.build_uuid val table = make_table(List(name, deps, build_uuid), name = "pending") lazy val table_index: Int = tables.index(table) } def pull_pending(db: SQL.Database, build_id: Long, state: State): State.Pending = Library.Update.data(state.pending, read_updates(db, Pending.table, build_id, state.serial, { res => val name = res.string(Pending.name) val deps = res.string(Pending.deps) val build_uuid = res.string(Pending.build_uuid) Task(name, split_lines(deps), build_uuid) }) ) def update_pending( db: SQL.Database, pending: State.Pending, old_pending: State.Pending ): Library.Update = { val update = Library.Update.make(old_pending, pending, kind = Pending.table_index) if (update.deletes) { db.execute_statement( Pending.table.delete(sql = Generic.sql_where(names = update.delete))) } if (update.inserts) { db.execute_batch_statement(Pending.table.insert(), batch = for (name <- update.insert) yield { (stmt: SQL.Statement) => val task = pending(name) stmt.string(1) = task.name stmt.string(2) = cat_lines(task.deps) stmt.string(3) = task.build_uuid }) } update } /* running jobs */ object Running { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start_date = SQL.Column.date("start_date") val table = make_table( List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date), name = "running") lazy val table_index: Int = tables.index(table) } def pull_running(db: SQL.Database, build_id: Long, state: State): State.Running = Library.Update.data(state.running, read_updates(db, Running.table, build_id, state.serial, { res => val name = res.string(Running.name) val worker_uuid = res.string(Running.worker_uuid) val build_uuid = res.string(Running.build_uuid) val hostname = res.string(Running.hostname) val numa_node = res.get_int(Running.numa_node) val rel_cpus = res.string(Running.rel_cpus) val start_date = res.date(Running.start_date) val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) Job(name, worker_uuid, build_uuid, node_info, start_date, None) }) ) def update_running( db: SQL.Database, running: State.Running, old_running: State.Running ): Library.Update = { val update = Library.Update.make(old_running, running, kind = Running.table_index) if (update.deletes) { db.execute_statement( Running.table.delete(sql = Generic.sql_where(names = update.delete))) } if (update.inserts) { db.execute_batch_statement(Running.table.insert(), batch = for (name <- update.insert) yield { (stmt: SQL.Statement) => val job = running(name) stmt.string(1) = job.name stmt.string(2) = job.worker_uuid stmt.string(3) = job.build_uuid stmt.string(4) = job.node_info.hostname stmt.int(5) = job.node_info.numa_node stmt.string(6) = Host.Range(job.node_info.rel_cpus) stmt.date(7) = job.start_date }) } update } /* job results */ object Results { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start_date = SQL.Column.date("start_date") val rc = SQL.Column.int("rc") val out = SQL.Column.string("out") val err = SQL.Column.string("err") val timing_elapsed = SQL.Column.long("timing_elapsed") val timing_cpu = SQL.Column.long("timing_cpu") val timing_gc = SQL.Column.long("timing_gc") val output_shasum = SQL.Column.string("output_shasum") val current = SQL.Column.bool("current") val table = make_table( List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date, rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current), name = "results") lazy val table_index: Int = tables.index(table) } def pull_results(db: SQL.Database, build_id: Long, state: State): State.Results = Library.Update.data(state.results, read_updates(db, Results.table, build_id, state.serial, { res => val name = res.string(Results.name) val worker_uuid = res.string(Results.worker_uuid) val build_uuid = res.string(Results.build_uuid) val hostname = res.string(Results.hostname) val numa_node = res.get_int(Results.numa_node) val rel_cpus = res.string(Results.rel_cpus) val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) val start_date = res.date(Results.start_date) val rc = res.int(Results.rc) val out = res.string(Results.out) val err = res.string(Results.err) val timing = res.timing( Results.timing_elapsed, Results.timing_cpu, Results.timing_gc) val process_result = Process_Result(rc, out_lines = split_lines(out), err_lines = split_lines(err), timing = timing) val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum)) val current = res.bool(Results.current) Result(name, worker_uuid, build_uuid, node_info, start_date, process_result, output_shasum, current) }) ) def update_results( db: SQL.Database, results: State.Results, old_results: State.Results ): Library.Update = { val update = Library.Update.make(old_results, results, kind = Results.table_index) if (update.deletes) { db.execute_statement( Results.table.delete(sql = Generic.sql_where(names = update.delete))) } if (update.inserts) { db.execute_batch_statement(Results.table.insert(), batch = for (name <- update.insert) yield { (stmt: SQL.Statement) => val result = results(name) val process_result = result.process_result stmt.string(1) = result.name stmt.string(2) = result.worker_uuid stmt.string(3) = result.build_uuid stmt.string(4) = result.node_info.hostname stmt.int(5) = result.node_info.numa_node stmt.string(6) = Host.Range(result.node_info.rel_cpus) stmt.date(7) = result.start_date stmt.int(8) = process_result.rc stmt.string(9) = cat_lines(process_result.out_lines) stmt.string(10) = cat_lines(process_result.err_lines) stmt.long(11) = process_result.timing.elapsed.ms stmt.long(12) = process_result.timing.cpu.ms stmt.long(13) = process_result.timing.gc.ms stmt.string(14) = result.output_shasum.toString stmt.bool(15) = result.current }) } update } /* collective operations */ def pull_state(db: SQL.Database, build_id: Long, worker_uuid: String, state: State): State = { val serial_db = read_serial(db) if (serial_db == state.serial) state else { val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) val sessions = pull_sessions(db, build_id, state) val pending = pull_pending(db, build_id, state) val running = pull_running(db, build_id, state) val results = pull_results(db, build_id, state) state.copy(serial = serial, sessions = sessions, pending = pending, running = running, results = results) } } def push_state( db: SQL.Database, build_id: Long, worker_uuid: String, state: State, old_state: State ): State = { val updates = List( update_sessions(db, state.sessions, old_state.sessions), update_pending(db, state.pending, old_state.pending), update_running(db, state.running, old_state.running), update_results(db, state.results, old_state.results) ).filter(_.defined) if (updates.nonEmpty) { val serial = state.next_serial write_updates(db, build_id, serial, updates) stamp_worker(db, worker_uuid, serial) state.copy(serial = serial) } else state } } def read_builds(db: SQL.Database): List[Build] = private_data.transaction_lock(db, create = true, label = "Build_Process.read_builds") { private_data.read_builds(db) } def init_build( db: SQL.Database, build_context: isabelle.Build.Context, build_start: Date ): Long = private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") { db.listen(private_data.channel) val build_uuid = build_context.build_uuid val build_id = private_data.get_build_id(db, build_uuid) if (build_context.master) { private_data.start_build(db, build_id, build_uuid, build_context.ml_platform, build_context.sessions_structure.session_prefs, build_start) } build_id } } /** main process **/ class Build_Process( protected final val build_context: Build.Context, protected final val build_progress: Progress, protected final val server: SSH.Server ) extends AutoCloseable { /* context */ protected final val store: Store = build_context.store protected final val build_options: Options = store.options protected final val build_deps: isabelle.Sessions.Deps = build_context.deps protected final val hostname: String = build_context.hostname protected final val build_uuid: String = build_context.build_uuid private var warning_seen = Set.empty[String] protected def warning(msg: String): Unit = synchronized { if (!warning_seen(msg)) { build_progress.echo_warning(msg) warning_seen += msg } } /* global resources with common close() operation */ protected val _database_server: Option[SQL.Database] = try { store.maybe_open_database_server(server = server) } catch { case exn: Throwable => close(); throw exn } protected val _heaps_database: Option[SQL.Database] = try { store.maybe_open_heaps_database(_database_server, server = server) } catch { case exn: Throwable => close(); throw exn } protected val _build_database: Option[SQL.Database] = try { for (db <- store.maybe_open_build_database(server = server)) yield { if (!db.is_postgresql) { error("Distributed build requires PostgreSQL (option build_database_server)") } val store_tables = db.is_postgresql Build_Process.private_data.transaction_lock(db, create = true, label = "Build_Process.build_database" ) { Build_Process.private_data.clean_build(db) if (store_tables) Store.private_data.tables.lock(db, create = true) } if (build_context.master) { db.vacuum(Build_Process.private_data.tables.list) if (store_tables) db.vacuum(Store.private_data.tables.list) } db } } catch { case exn: Throwable => close(); throw exn } protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] = _build_database.flatMap(_.receive(filter)).getOrElse(Nil) protected def build_send(notification: SQL.Notification): Unit = _build_database.foreach(_.send(notification)) protected def build_cluster: Boolean = _build_database match { case Some(db) => db.is_postgresql case None => false } protected val build_delay: Time = build_options.seconds( if (!build_cluster) "build_delay" else if (build_context.master) "build_delay_master" else "build_delay_worker") protected val build_expire: Int = if (!build_cluster || build_context.master) 1 else build_options.int("build_cluster_expire").max(1) protected val _host_database: SQL.Database = try { store.open_build_database(path = Host.private_data.database, server = server) } catch { case exn: Throwable => close(); throw exn } protected val (progress, worker_uuid) = synchronized { if (_build_database.isEmpty) (build_progress, UUID.random_string()) else { try { val db = store.open_build_database(Database_Progress.private_data.database, server = server) val progress = new Database_Progress(db, build_progress, input_messages = build_context.master, hostname = hostname, context_uuid = build_uuid, kind = "build_process", timeout = Some(build_delay), tick_expire = build_expire) (progress, progress.agent_uuid) } catch { case exn: Throwable => close(); throw exn } } } protected val log: Logger = Logger.make_system_log(progress, build_options) protected val build_start: Date = build_context.build_start getOrElse progress.now() protected val build_id: Long = _build_database match { case None => 0L case Some(db) => Build_Process.init_build(db, build_context, build_start) } protected def open_build_cluster(): Build_Cluster = Build_Cluster.make(build_context, progress = build_progress).open() protected val _build_cluster: Build_Cluster = try { if (build_context.master && _build_database.isDefined) open_build_cluster() else Build_Cluster.none } catch { case exn: Throwable => close(); throw exn } def close(): Unit = synchronized { Option(_database_server).flatten.foreach(_.close()) Option(_heaps_database).flatten.foreach(_.close()) Option(_build_database).flatten.foreach(_.close()) Option(_host_database).foreach(_.close()) Option(_build_cluster).foreach(_.close()) progress match { case db_progress: Database_Progress => db_progress.close() case _ => } } /* global state: internal var vs. external database */ protected var _state: Build_Process.State = Build_Process.State() protected def synchronized_database[A](label: String)(body: => A): A = synchronized { _build_database match { case None => body case Some(db) => Build_Process.private_data.transaction_lock(db, label = label) { val old_state = Build_Process.private_data.pull_state(db, build_id, worker_uuid, _state) _state = old_state val res = body _state = Build_Process.private_data.push_state( db, build_id, worker_uuid, _state, old_state) res } } } /* policy operations */ protected def next_jobs(state: Build_Process.State): List[String] = { val limit = { if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 } else build_context.jobs - state.build_running.length } if (limit > 0) state.next_ready.sortBy(_.name)(state.sessions.ordering).take(limit).map(_.name) else Nil } protected def next_node_info(state: Build_Process.State, session_name: String): Host.Node_Info = { - def used_nodes: Set[Int] = + val available_nodes = build_context.numa_nodes + val used_nodes = Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i) - val numa_node = Host.next_numa_node(_host_database, hostname, state.numa_nodes, used_nodes) + val numa_node = Host.next_numa_node(_host_database, hostname, available_nodes, used_nodes) Host.Node_Info(hostname, numa_node, Nil) } protected def start_session( state: Build_Process.State, session_name: String, ancestor_results: List[Build_Process.Result] ): Build_Process.State = { val sources_shasum = state.sessions(session_name).sources_shasum - - val input_shasum = - if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() - else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) - - val store_heap = - build_context.build_heap || Sessions.is_pure(session_name) || - state.sessions.iterator.exists(_.ancestors.contains(session_name)) - + val input_shasum = ML_Process.make_shasum(ancestor_results.map(_.output_shasum)) + val store_heap = build_context.store_heap || state.sessions.store_heap(session_name) val (current, output_shasum) = store.check_output(_database_server, session_name, session_options = build_context.sessions_structure(session_name).options, sources_shasum = sources_shasum, input_shasum = input_shasum, fresh_build = build_context.fresh_build, store_heap = store_heap) val finished = current && ancestor_results.forall(_.current) val skipped = build_context.no_build val cancelled = progress.stopped || !ancestor_results.forall(_.ok) if (!skipped && !cancelled) { for (db <- _database_server orElse _heaps_database) { val hierarchy = (session_name :: ancestor_results.map(_.name)) .map(store.output_session(_, store_heap = true)) ML_Heap.restore(db, hierarchy, cache = store.cache.compress) } } val result_name = (session_name, worker_uuid, build_uuid) val start = progress.now() if (finished) { state .remove_pending(session_name) .make_result(result_name, Process_Result.ok, output_shasum, start, current = true) } else if (skipped) { progress.echo("Skipping " + session_name + " ...", verbose = true) state. remove_pending(session_name). make_result(result_name, Process_Result.error, output_shasum, start) } else if (cancelled) { progress.echo(session_name + " CANCELLED") state .remove_pending(session_name) .make_result(result_name, Process_Result.undefined, output_shasum, start) } else { val build_log_verbose = build_options.bool("build_log_verbose") val start_time = start - build_start val start_time_msg = build_log_verbose val node_info = next_node_info(state, session_name) val node_info_msg = node_info.numa || build_log_verbose progress.echo( (if (store_heap) "Building " else "Running ") + session_name + if_proper(start_time_msg || node_info_msg, " (" + if_proper(start_time_msg, "started " + start_time.message_hms) + if_proper(start_time_msg && node_info_msg, " ") + if_proper(node_info_msg, "on " + node_info.toString) + ")") + " ...") val session = state.sessions(session_name) val background = build_deps.background(session_name) val build = Build_Job.start_session(build_context, session, progress, log, server, background, sources_shasum, input_shasum, node_info, store_heap) state.add_running( Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, start, Some(build))) } } /* build process roles */ final def is_session_name(job_name: String): Boolean = !Long_Name.is_qualified(job_name) protected final def stop_build(): Unit = synchronized_database("Build_Process.stop_build") { for (db <- _build_database) { Build_Process.private_data.stop_build(db, build_uuid) } } protected final def start_worker(): Unit = synchronized_database("Build_Process.start_worker") { for (db <- _build_database) { _state = _state.copy(serial = _state.next_serial) Build_Process.private_data.start_worker(db, worker_uuid, build_uuid, _state.serial) } } protected final def stop_worker(): Unit = synchronized_database("Build_Process.stop_worker") { for (db <- _build_database) { Build_Process.private_data.stamp_worker(db, worker_uuid, _state.serial, stop_now = true) } } /* prepare */ def prepare(): Unit = { for (name <- build_context.clean_sessions) { store.clean_output(_database_server orElse _heaps_database, name, progress = progress) } } /* run */ protected def finished(): Boolean = synchronized { if (!build_context.master && progress.stopped) _state.build_running.isEmpty else _state.pending.isEmpty } private var _build_tick: Long = 0L protected def build_action(): Boolean = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { val received = build_receive(n => n.channel == Build_Process.private_data.channel) val ready = received.contains(Build_Process.private_data.channel_ready) val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) } val finished = synchronized { _state.finished_running() } def sleep: Boolean = { build_delay.sleep() val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 } expired || reactive || progress.stopped } finished || sleep } - protected def init_unsynchronized(): Unit = { + protected def init_unsynchronized(): Unit = if (build_context.master) { val sessions1 = _state.sessions.init(build_context, _database_server, progress = build_progress) val pending1 = sessions1.iterator.foldLeft(_state.pending) { case (map, session) => if (map.isDefinedAt(session.name)) map else map + Build_Process.Task.entry(session, build_context) } _state = _state.copy(sessions = sessions1, pending = pending1) } - val numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling) - _state = _state.copy(numa_nodes = numa_nodes) - } - protected def main_unsynchronized(): Unit = { for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) { val result = build.join val result_name = (job.name, worker_uuid, build_uuid) _state = _state. remove_pending(job.name). remove_running(job.name). make_result(result_name, result.process_result, result.output_shasum, job.start_date, node_info = job.node_info) } for (name <- next_jobs(_state)) { if (is_session_name(name)) { if (build_context.sessions_structure.defined(name)) { _state.ancestor_results(name) match { case Some(results) => _state = start_session(_state, name, results) case None => warning("Bad build job " + quote(name) + ": no ancestor results") } } else warning("Bad build job " + quote(name) + ": no session info") } else warning("Bad build job " + quote(name)) } } def run(): Build.Results = { val vacuous = synchronized_database("Build_Process.init") { _build_cluster.init() init_unsynchronized() build_context.master && _state.pending.isEmpty } if (vacuous) { progress.echo_warning("Nothing to build") if (build_context.master) stop_build() Build.Results(build_context) } else { start_worker() _build_cluster.start() try { while (!finished()) { synchronized_database("Build_Process.main") { if (progress.stopped) _state.build_running.foreach(_.cancel()) main_unsynchronized() if (build_context.master && _state.exists_ready) { build_send(Build_Process.private_data.channel_ready) } } while(!build_action()) {} } } finally { _build_cluster.stop() stop_worker() if (build_context.master) stop_build() } synchronized_database("Build_Process.result") { val results = for ((name, result) <- _state.results) yield name -> result.process_result Build.Results(build_context, results = results, other_rc = _build_cluster.rc) } } } /* snapshot */ def snapshot(): Build_Process.Snapshot = synchronized_database("Build_Process.snapshot") { val (builds, workers) = _build_database match { case None => (Nil, Nil) case Some(db) => (Build_Process.private_data.read_builds(db), Build_Process.private_data.read_workers(db)) } Build_Process.Snapshot( builds = builds, workers = workers, sessions = _state.sessions, pending = _state.pending, running = _state.running, results = _state.results) } /* toString */ override def toString: String = "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) + if_proper(build_context.master, ", master = true") + ")" } diff --git a/src/Pure/Build/build_schedule.scala b/src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala +++ b/src/Pure/Build/build_schedule.scala @@ -1,1778 +1,1769 @@ /* Title: Pure/Build/build_schedule.scala Author: Fabian Huch, TU Muenchen Build schedule generated by Heuristic methods, allowing for more efficient builds. */ package isabelle import Host.Node_Info import scala.annotation.tailrec import scala.collection.mutable import scala.Ordering.Implicits.seqOrdering object Build_Schedule { /* organized historic timing information (extracted from build logs) */ case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) { def elapsed: Time = timing.elapsed def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms) } object Timing_Data { def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2) def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2) def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size) private def dummy_entries(host: Host, host_factor: Double) = { val baseline = Time.minutes(5).scale(host_factor) val gc = Time.seconds(10).scale(host_factor) List( Result("dummy", host.name, 1, Timing(baseline, baseline, gc)), Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc))) } def make( host_infos: Host_Infos, build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)], session_structure: Sessions.Structure, ): Timing_Data = { val hosts = host_infos.hosts val measurements = for { (meta_info, build_info) <- build_history build_host = meta_info.get_build_host (job_name, session_info) <- build_info.sessions.toList if build_info.finished_sessions.contains(job_name) hostname <- session_info.hostname.orElse(build_host).toList host <- hosts.find(_.name == hostname).toList threads = session_info.threads.getOrElse(host.max_threads) } yield (job_name, hostname, threads) -> session_info.timing val entries = if (measurements.isEmpty) { val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last host_infos.hosts.flatMap(host => dummy_entries(host, host_infos.host_factor(default_host, host))) } else measurements.groupMap(_._1)(_._2).toList.map { case ((job_name, hostname, threads), timings) => Result(job_name, hostname, threads, median_timing(timings)) } new Timing_Data(new Facet(entries), host_infos, session_structure) } def load( host_infos: Host_Infos, log_database: SQL.Database, sessions_structure: Sessions.Structure ): Timing_Data = { val build_history = for { log_name <- log_database.execute_query_statement( Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)), List.from[String], res => res.string(Build_Log.Column.log_name)) meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name) build_info = Build_Log.private_data.read_build_info(log_database, log_name) } yield (meta_info, build_info) make(host_infos, build_history, sessions_structure) } /* data facets */ object Facet { def unapply(facet: Facet): Option[List[Result]] = Some(facet.results) } class Facet private[Timing_Data](val results: List[Result]) { require(results.nonEmpty) def is_empty: Boolean = results.isEmpty def size: Int = results.length lazy val by_job: Map[String, Facet] = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap lazy val by_threads: Map[Int, Facet] = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap lazy val by_hostname: Map[String, Facet] = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap def median_time: Time = Timing_Data.median_time(results.map(_.elapsed)) def best_result: Result = results.minBy(_.elapsed.ms) } } class Timing_Data private( facet: Timing_Data.Facet, val host_infos: Host_Infos, val sessions_structure: Sessions.Structure ) { private def inflection_point(last_mono: Int, next: Int): Int = last_mono + ((next - last_mono) / 2) def best_threads(job_name: String, max_threads: Int): Int = { val worse_threads = facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { case (hostname, facet) => val best_threads = facet.best_result.threads facet.by_threads.keys.toList.sorted.find(_ > best_threads).map( inflection_point(best_threads, _)) } (max_threads :: worse_threads).min } private def hostname_factor(from: String, to: String): Double = host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to)) private def approximate_threads(entries_unsorted: List[(Int, Time)], threads: Int): Time = { val entries = entries_unsorted.sortBy(_._1) def sorted_prefix[A](xs: List[A], f: A => Long): List[A] = xs match { case x1 :: x2 :: xs => if (f(x1) <= f(x2)) x1 :: sorted_prefix(x2 :: xs, f) else x1 :: Nil case xs => xs } def linear(p0: (Int, Time), p1: (Int, Time)): Time = { val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1)) val b = p0._2 - a.scale(p0._1) (a.scale(threads) + b) max Time.zero } val mono_prefix = sorted_prefix(entries, e => -e._2.ms) val is_mono = entries == mono_prefix val in_prefix = mono_prefix.length > 1 && threads <= mono_prefix.last._1 val in_inflection = !is_mono && mono_prefix.length > 1 && threads < entries.drop(mono_prefix.length).head._1 if (is_mono || in_prefix || in_inflection) { // Model with Amdahl's law val t_p = Timing_Data.median_time(for { (n, t0) <- mono_prefix (m, t1) <- mono_prefix if m != n } yield (t0 - t1).scale(n.toDouble * m / (m - n))) val t_c = Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n)) def model(threads: Int): Time = (t_c + t_p.scale(1.0 / threads)) max Time.zero if (is_mono || in_prefix) model(threads) else { val post_inflection = entries.drop(mono_prefix.length).head val inflection_threads = inflection_point(mono_prefix.last._1, post_inflection._1) if (threads <= inflection_threads) model(threads) else linear((inflection_threads, model(inflection_threads)), post_inflection) } } else { // Piecewise linear val (p0, p1) = if (entries.head._1 < threads && threads < entries.last._1) { val split = entries.partition(_._1 < threads) (split._1.last, split._2.head) } else { val piece = if (threads < entries.head._1) entries.take(2) else entries.takeRight(2) (piece.head, piece.last) } linear(p0, p1) } } private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = { def unify(hostname: String, facet: Timing_Data.Facet) = facet.median_time.scale(hostname_factor(hostname, on_host)) for { facet <- facet.by_job.get(job_name).toList (threads, facet) <- facet.by_threads entries = facet.by_hostname.toList.map(unify) } yield threads -> Timing_Data.mean_time(entries) } def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = { def try_approximate(facet: Timing_Data.Facet): Option[Time] = { val entries = facet.by_threads.toList match { case List((i, Timing_Data.Facet(List(result)))) if i != 1 => (i, facet.median_time) :: result.proper_cpu.map(1 -> _).toList case entries => entries.map((threads, facet) => threads -> facet.median_time) } if (entries.size < 2) None else Some(approximate_threads(entries, threads)) } for { facet <- facet.by_job.get(job_name) facet <- facet.by_hostname.get(hostname) time <- facet.by_threads.get(threads).map(_.median_time).orElse(try_approximate(facet)) } yield time } def global_threads_factor(from: Int, to: Int): Double = { def median(xs: Iterable[Double]): Double = xs.toList.sorted.apply(xs.size / 2) val estimates = for { (hostname, facet) <- facet.by_hostname job_name <- facet.by_job.keys from_time <- estimate_threads(job_name, hostname, from) to_time <- estimate_threads(job_name, hostname, to) } yield from_time.ms.toDouble / to_time.ms if (estimates.nonEmpty) median(estimates) else { // unify hosts val estimates = for { (job_name, facet) <- facet.by_job hostname = facet.by_hostname.keys.head entries = unify_hosts(job_name, hostname) if entries.length > 1 } yield approximate_threads(entries, from).ms.toDouble / approximate_threads(entries, to).ms if (estimates.nonEmpty) median(estimates) else from.toDouble / to.toDouble } } private var cache: Map[(String, String, Int), Time] = Map.empty /* approximation factors -- penalize estimations with less information */ val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5 val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7 val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5 val FACTOR_NO_THREADS_SAME_MACHINE = 1.4 val FACTOR_THREADS_OTHER_MACHINE = 1.2 def estimate(job_name: String, hostname: String, threads: Int): Time = { def estimate: Time = facet.by_job.get(job_name) match { case None => // no data for job, use timeout as esimation for single-threaded job on worst host val default_time = sessions_structure.get(job_name).map(_.timeout).getOrElse(Time.zero) if (default_time > Time.zero) { val default_host = host_infos.hosts.sorted(host_infos.host_speeds).head default_time .scale(global_threads_factor(1, threads)) .scale(hostname_factor(default_host.name, hostname)) } else { // no timeout, take average of other jobs for given threads val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads)) if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates) else { // no other job to estimate from, use global curve to approximate any other job val (threads1, facet1) = facet.by_threads.head facet1.median_time.scale(global_threads_factor(threads1, threads)) } } case Some(facet) => facet.by_threads.get(threads) match { case None => // interpolate threads estimate_threads(job_name, hostname, threads).map(_.scale( FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse { // per machine, try to approximate config for threads val approximated = for { hostname1 <- facet.by_hostname.keys estimate <- estimate_threads(job_name, hostname1, threads) factor = hostname_factor(hostname1, hostname) } yield estimate.scale(factor) if (approximated.nonEmpty) Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE) else { // no single machine where config can be approximated, unify data points val unified_entries = unify_hosts(job_name, hostname) if (unified_entries.length > 1) approximate_threads(unified_entries, threads).scale( FACTOR_NO_THREADS_UNIFY_MACHINES) else { // only single data point, use global curve to approximate val (job_threads, job_time) = unified_entries.head job_time.scale(global_threads_factor(job_threads, threads)).scale( FACTOR_NO_THREADS_GLOBAL_CURVE) } } } case Some(facet) => // time for job/thread exists, interpolate machine if necessary facet.by_hostname.get(hostname).map(_.median_time).getOrElse { Timing_Data.mean_time( facet.by_hostname.toList.map((hostname1, facet) => facet.median_time.scale(hostname_factor(hostname1, hostname)))).scale( FACTOR_THREADS_OTHER_MACHINE) } } } cache.get(job_name, hostname, threads) match { case Some(time) => time case None => val time = estimate cache = cache + ((job_name, hostname, threads) -> time) time } } } /* host information */ object Host { def load(options: Options, build_host: Build_Cluster.Host, host_db: SQL.Database): Host = { val name = build_host.name val info = isabelle.Host.read_info(host_db, name).getOrElse(error("No info for host " + quote(name))) val max_threads = (options ++ build_host.options).threads(default = info.num_cpus) val score = info.benchmark_score.getOrElse(error("No benchmark for " + quote(name))) Host( name = name, num_cpus = info.num_cpus, max_jobs = build_host.jobs, max_threads = max_threads, numa = build_host.numa, numa_nodes = info.numa_nodes, benchmark_score = score, options = build_host.options) } } case class Host( name: String, num_cpus: Int, max_jobs: Int, max_threads: Int, benchmark_score: Double, numa: Boolean = false, numa_nodes: List[Int] = Nil, options: List[Options.Spec] = Nil) object Host_Infos { def load( options: Options, build_hosts: List[Build_Cluster.Host], host_db: SQL.Database ): Host_Infos = new Host_Infos(build_hosts.map(Host.load(options, _, host_db))) } class Host_Infos private(val hosts: List[Host]) { require(hosts.nonEmpty) private val by_hostname = hosts.map(host => host.name -> host).toMap def host_factor(from: Host, to: Host): Double = from.benchmark_score / to.benchmark_score val host_speeds: Ordering[Host] = Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1) def the_host(hostname: String): Host = by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname))) def the_host(node_info: Node_Info): Host = the_host(node_info.hostname) def num_threads(node_info: Node_Info): Int = if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length else the_host(node_info).max_threads def available(state: Build_Process.State): Resources = { val allocated = state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _) new Resources(this, allocated) } } /* offline tracking of job configurations and resource allocations */ case class Config(job_name: String, node_info: Node_Info) { def job_of(start_time: Time): Build_Process.Job = Build_Process.Job(job_name, "", "", node_info, Date(start_time), None) } class Resources( val host_infos: Host_Infos, allocated_nodes: Map[String, List[Node_Info]] ) { def unused_nodes(host: Host, threads: Int): List[Node_Info] = if (!available(host, threads)) Nil else { val node = next_node(host, threads) node :: allocate(node).unused_nodes(host, threads) } def unused_nodes(threads: Int): List[Node_Info] = host_infos.hosts.flatMap(unused_nodes(_, threads)) def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil) def allocate(node_info: Node_Info): Resources = { val host = host_infos.the_host(node_info) new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host)))) } def try_allocate_tasks( hosts: List[(Host, Int)], tasks: List[(Build_Process.Task, Int, Int)], ): (List[Config], Resources) = tasks match { case Nil => (Nil, this) case (task, min_threads, max_threads) :: tasks => val (config, resources) = hosts.find((host, _) => available(host, min_threads)) match { case Some((host, host_max_threads)) => val free_threads = host.max_threads - ((host.max_jobs - 1) * host_max_threads) val node_info = next_node(host, (min_threads max free_threads) min max_threads) (Some(Config(task.name, node_info)), allocate(node_info)) case None => (None, this) } val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks) (configs ++ config, resources1) } def next_node(host: Host, threads: Int): Node_Info = { val numa_node_num_cpus = host.num_cpus / (host.numa_nodes.length max 1) def explicit_cpus(node_info: Node_Info): List[Int] = if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _) val available_nodes = host.numa_nodes val numa_node = if (!host.numa) None else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil Node_Info(host.name, numa_node, rel_cpus) } def available(host: Host, threads: Int): Boolean = { val used = allocated(host) if (used.length >= host.max_jobs) false else { - if (host.numa_nodes.length <= 1) + if (host.numa_nodes.length <= 1) { used.map(host_infos.num_threads).sum + threads <= host.max_threads + } else { def node_threads(n: Int): Int = used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum host.numa_nodes.exists( node_threads(_) + threads <= host.num_cpus / host.numa_nodes.length) } } } } /* schedule generation */ object Schedule { case class Node(job_name: String, node_info: Node_Info, start: Date, duration: Time) { def end: Date = Date(start.time + duration) } type Graph = isabelle.Graph[String, Node] def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty) /* file representation */ def write(value: Schedule, file: Path): Unit = { import XML.Encode._ def time: T[Time] = (time => long(time.ms)) def date: T[Date] = (date => time(date.time)) def node_info: T[Node_Info] = (node_info => triple(string, option(int), list(int))( (node_info.hostname, node_info.numa_node, node_info.rel_cpus))) def node: T[Node] = (node => pair(string, pair(node_info, pair(date, time)))( (node.job_name, (node.node_info, (node.start, node.duration))))) def schedule: T[Schedule] = (schedule => pair(string, pair(string, pair(date, pair(Graph.encode(string, node), long))))(( schedule.build_uuid, (schedule.generator, (schedule.start, (schedule.graph, schedule.serial)))))) File.write(file, YXML.string_of_body(schedule(value))) } def read(file: Path): Schedule = { import XML.Decode._ def time: T[Time] = { body => Time.ms(long(body)) } def date: T[Date] = { body => Date(time(body)) } def node_info: T[Node_Info] = { body => val (hostname, numa_node, rel_cpus) = triple(string, option(int), list(int))(body) Node_Info(hostname, numa_node, rel_cpus) } val node: T[Schedule.Node] = { body => val (job_name, (info, (start, duration))) = pair(string, pair(node_info, pair(date, time)))(body) Node(job_name, info, start, duration) } def schedule: T[Schedule] = { body => val (build_uuid, (generator, (start, (graph, serial)))) = pair(string, pair(string, (pair(date, pair(Graph.decode(string, node), long)))))(body) Schedule(build_uuid, generator, start, graph, serial) } schedule(YXML.parse_body(File.read(file))) } } case class Schedule( build_uuid: String, generator: String, start: Date, graph: Schedule.Graph, serial: Long = 0, ) { def next_serial: Long = Build_Process.State.inc_serial(serial) def end: Date = if (graph.is_empty) start else graph.maximals.map(graph.get_node).map(_.end).max(Date.Ordering) def duration: Time = end - start def durations: List[Time] = graph.keys.map(graph.get_node(_).end - start) def message: String = "Estimated " + duration.message_hms + " build time with " + generator def deviation(other: Schedule): Time = Time.ms((end - other.end).ms.abs) def num_built(state: Build_Process.State): Int = graph.keys.count(state.results.contains) def elapsed(): Time = Time.now() - start.time def is_empty: Boolean = graph.is_empty def is_outdated(options: Options, state: Build_Process.State): Boolean = if (is_empty) true else elapsed() > options.seconds("build_schedule_outdated_delay") def next(hostname: String, state: Build_Process.State): List[String] = { val now = Time.now() val next_nodes = for { task <- state.next_ready if graph.defined(task.name) node = graph.get_node(task.name) if hostname == node.node_info.hostname } yield node val (ready, other) = next_nodes.partition(node => graph.imm_preds(node.job_name).subsetOf(state.results.keySet)) val waiting = other.filter(_.start.time <= now) val running = state.running.values.toList.map(_.node_info).filter(_.hostname == hostname) def try_run(ready: List[Schedule.Node], next: Schedule.Node): List[Schedule.Node] = { val existing = ready.map(_.node_info) ::: running val is_distinct = existing.forall(_.rel_cpus.intersect(next.node_info.rel_cpus).isEmpty) if (existing.forall(_.rel_cpus.nonEmpty) && is_distinct) next :: ready else ready } waiting.foldLeft(ready)(try_run).map(_.job_name) } def exists_next(hostname: String, state: Build_Process.State): Boolean = next(hostname, state).nonEmpty def update(state: Build_Process.State): Schedule = { val start1 = Date.now() def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph = graph.map_node(name, { node => val elapsed = start1 - state.running(name).start_date node.copy(duration = (node.duration - elapsed).max(Time.zero)) }) def shift_starts(graph: Schedule.Graph, name: String): Schedule.Graph = graph.map_node(name, { node => val starts = start1 :: graph.imm_preds(node.job_name).toList.map(graph.get_node(_).end) node.copy(start = starts.max(Date.Ordering)) }) val graph0 = state.running.keys.foldLeft(graph.restrict(state.pending.isDefinedAt))(shift_elapsed) val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts) copy(start = start1, graph = graph1) } } case class State(build_state: Build_Process.State, current_time: Time, finished: Schedule) { def start(config: Config): State = copy(build_state = build_state.copy(running = build_state.running + (config.job_name -> config.job_of(current_time)))) def step(timing_data: Timing_Data): State = { val remaining = build_state.running.values.toList.map { job => val elapsed = current_time - job.start_date.time val threads = timing_data.host_infos.num_threads(job.node_info) val predicted = timing_data.estimate(job.name, job.node_info.hostname, threads) val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed job -> remaining } if (remaining.isEmpty) error("Schedule step without running sessions") else { val (job, elapsed) = remaining.minBy(_._2.ms) val now = current_time + elapsed val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time) val host_preds = for { name <- finished.graph.keys pred_node = finished.graph.get_node(name) if pred_node.node_info.hostname == job.node_info.hostname if pred_node.end.time <= node.start.time } yield name val build_preds = build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined) val preds = build_preds ++ host_preds val graph = preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name)) val build_state1 = build_state.remove_running(job.name).remove_pending(job.name) State(build_state1, now, finished.copy(graph = graph)) } } def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty } trait Scheduler { def schedule(build_state: Build_Process.State): Schedule } trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] } case class Generation_Scheme( priority_rule: Priority_Rule, timing_data: Timing_Data, build_uuid: String ) extends Scheduler { def schedule(build_state: Build_Process.State): Schedule = { @tailrec def simulate(state: State): State = if (state.is_finished) state else { val state1 = priority_rule .select_next(state.build_state) .foldLeft(state)(_.start(_)) .step(timing_data) simulate(state1) } val start = Date.now() val name = "generation scheme (" + priority_rule + ")" val end_state = simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty))) end_state.finished } } case class Optimizer(schedulers: List[Scheduler], schedules: List[Schedule]) extends Scheduler { require(schedulers.nonEmpty) def schedule(state: Build_Process.State): Schedule = { def main(scheduler: Scheduler): Schedule = scheduler.schedule(state) (Par_List.map(main, schedulers) ::: schedules.map(_.update(state))).minBy(schedule => schedule.durations.map(_.ms).sorted.reverse) } } /* priority rules */ class Default_Heuristic(host_infos: Host_Infos) extends Priority_Rule { override def toString: String = "default heuristic" def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] = sorted_jobs.zip(resources.unused_nodes(host, host.max_threads)).map(Config(_, _)) def select_next(state: Build_Process.State): List[Config] = { val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name) val resources = host_infos.available(state) host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) { case ((jobs, res), host) => val configs = next_jobs(resources, jobs, host) val config_jobs = configs.map(_.job_name).toSet (jobs.filterNot(config_jobs.contains), configs ::: res) }._2 } } object Path_Time_Heuristic { sealed trait Critical_Criterion case class Absolute_Time(time: Time) extends Critical_Criterion { override def toString: String = "absolute time (" + time.message_hms + ")" } case class Relative_Time(factor: Double) extends Critical_Criterion { override def toString: String = "relative time (" + factor + ")" } sealed trait Parallel_Strategy case class Fixed_Thread(threads: Int) extends Parallel_Strategy { override def toString: String = "fixed threads (" + threads + ")" } case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy { override def toString: String = "time based threads" } sealed trait Host_Criterion case object Critical_Nodes extends Host_Criterion { override def toString: String = "per critical node" } case class Fixed_Fraction(fraction: Double) extends Host_Criterion { override def toString: String = "fixed fraction (" + fraction + ")" } case class Host_Speed(min_factor: Double) extends Host_Criterion { override def toString: String = "host speed (" + min_factor + ")" } } class Path_Time_Heuristic( is_critical: Path_Time_Heuristic.Critical_Criterion, parallel_threads: Path_Time_Heuristic.Parallel_Strategy, host_criterion: Path_Time_Heuristic.Host_Criterion, timing_data: Timing_Data, sessions_structure: Sessions.Structure, max_threads_limit: Int = 8 ) extends Priority_Rule { import Path_Time_Heuristic.* override def toString: Node = { val params = List( "critical: " + is_critical, "parallel: " + parallel_threads, "fast hosts: " + host_criterion) "path time heuristic (" + params.mkString(", ") + ")" } /* pre-computed properties for efficient heuristic */ val host_infos: Host_Infos = timing_data.host_infos val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds) val max_threads: Int = host_infos.hosts.map(_.max_threads).max min max_threads_limit type Node = String val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph val minimals: List[Node] = build_graph.minimals val maximals: List[Node] = build_graph.maximals val best_threads: Map[Node, Int] = build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap def best_time(node: Node): Time = { val host = ordered_hosts.last val threads = best_threads(node) min host.max_threads timing_data.estimate(node, host.name, threads) } val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms) def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) def max_time(task: Build_Process.Task): Time = max_time(task.name) def path_times(minimals: List[Node]): Map[Node, Time] = { def time_ms(node: Node): Long = best_times(node).ms val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) path_times_ms.view.mapValues(Time.ms).toMap } def path_max_times(minimals: List[Node]): Map[Node, Time] = path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap val node_degrees: Map[Node, Int] = build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap def parallel_paths( running: List[(Node, Time)], nodes: Set[Node] = build_graph.keys.toSet, max: Int = Int.MaxValue ): Int = if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max else { def start(node: Node): (Node, Time) = node -> best_times(node) def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = node -> (time - elapsed) def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = if (running.size >= max) (max, running) else if (running.isEmpty) (0, running) else { def get_next(node: Node): List[Node] = build_graph.imm_succs(node).intersect(nodes).filter( build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList val (next, elapsed) = running.minBy(_._2.ms) val (remaining, finished) = running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) val running1 = remaining.map(pass_time(elapsed)).toMap ++ finished.map(_._1).flatMap(get_next).map(start) val (res, running2) = parallel_paths(running1) (res max running.size, running2) } parallel_paths(running.toMap)._1 } def select_next(state: Build_Process.State): List[Config] = { val resources = host_infos.available(state) def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name) val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) val available_nodes = host_infos.available(state.copy(running = Map.empty)) .unused_nodes(max_threads) .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse def remaining_time(node: Node): (Node, Time) = state.running.get(node) match { case None => node -> best_times(node) case Some(job) => val estimate = timing_data.estimate(job.name, job.node_info.hostname, host_infos.num_threads(job.node_info)) node -> ((Time.now() - job.start_date.time + estimate) max Time.zero) } val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse val is_parallelizable = available_nodes.length >= parallel_paths( state.ready.map(_.name).map(remaining_time), max = available_nodes.length + 1) if (is_parallelizable) { val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 } else { def is_critical(time: Time): Boolean = this.is_critical match { case Absolute_Time(threshold) => time > threshold case Relative_Time(factor) => time > minimals.map(max_time).maxBy(_.ms).scale(factor) } val critical_minimals = state.ready.filter(task => is_critical(max_time(task))).map(_.name) val critical_nodes = path_max_times(critical_minimals).filter((_, time) => is_critical(time)).keySet val (critical, other) = next_sorted.partition(task => critical_nodes.contains(task.name)) val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) def parallel_threads(task: Build_Process.Task): Int = this.parallel_threads match { case Fixed_Thread(threads) => threads case Time_Based_Threads(f) => f(best_times(task.name)) } val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) val max_critical_parallel = parallel_paths(critical_minimals.map(remaining_time), critical_nodes) val max_critical_hosts = available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length val split = this.host_criterion match { case Critical_Nodes => max_critical_hosts case Fixed_Fraction(fraction) => ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts case Host_Speed(min_factor) => val best = rev_ordered_hosts.head._1.benchmark_score val num_fast = rev_ordered_hosts.count(_._1.benchmark_score >= best * min_factor) num_fast min max_critical_hosts } val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split) val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) configs1 ::: configs2 } } } /* master and slave processes for scheduled build */ class Scheduled_Build_Process( build_context: Build.Context, build_progress: Progress, server: SSH.Server, ) extends Build_Process(build_context, build_progress, server) { /* global state: internal var vs. external database */ protected var _schedule: Schedule = Schedule.init(build_uuid) override protected def synchronized_database[A](label: String)(body: => A): A = synchronized { _build_database match { case None => body case Some(db) => db.transaction_lock(Build_Schedule.private_data.all_tables, label = label) { val old_state = Build_Process.private_data.pull_state(db, build_id, worker_uuid, _state) val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule) _state = old_state _schedule = old_schedule val res = body _state = Build_Process.private_data.push_state( db, build_id, worker_uuid, _state, old_state) _schedule = Build_Schedule.private_data.pull_schedule(db, _schedule, old_schedule) res } } } /* build process */ override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = _schedule.graph.get_node(session_name).node_info override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped || _schedule.is_empty) Nil else _schedule.next(hostname, state) private var _build_tick: Long = 0L protected override def build_action(): Boolean = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { val received = build_receive(n => n.channel == Build_Process.private_data.channel) val ready = received.contains(Build_Schedule.private_data.channel_ready(hostname)) val finished = synchronized { _state.finished_running() } def sleep: Boolean = { build_delay.sleep() val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 } expired || ready || progress.stopped } finished || sleep } } abstract class Scheduler_Build_Process( build_context: Build.Context, build_progress: Progress, server: SSH.Server, ) extends Scheduled_Build_Process(build_context, build_progress, server) { require(build_context.master) for (db <- _build_database) { Build_Schedule.private_data.transaction_lock( db, create = true, label = "Scheduler_Build_Process.create" ) { Build_Schedule.private_data.clean_build_schedules(db) } db.vacuum(Build_Schedule.private_data.tables.list) } def init_scheduler(timing_data: Timing_Data): Scheduler /* global resources with common close() operation */ private final val _log_store: Build_Log.Store = Build_Log.store(build_options) private final val _log_database: SQL.Database = try { val db = _log_store.open_database(server = this.server) _log_store.init_database(db) db } catch { case exn: Throwable => close(); throw exn } override def close(): Unit = { Option(_log_database).foreach(_.close()) super.close() } /* previous results via build log */ override def open_build_cluster(): Build_Cluster = { val build_cluster = super.open_build_cluster() build_cluster.init() Build_Benchmark.benchmark_requirements(build_options) if (build_context.worker) { val benchmark_options = build_options.string("build_hostname") = hostname Build_Benchmark.run_benchmark(benchmark_options, progress) } build_cluster.benchmark() } private val timing_data: Timing_Data = { val cluster_hosts: List[Build_Cluster.Host] = if (!build_context.worker) build_context.build_hosts else { val local_build_host = Build_Cluster.Host( hostname, jobs = build_context.jobs, numa = build_context.numa_shuffling) local_build_host :: build_context.build_hosts } val host_infos = Host_Infos.load(build_options, cluster_hosts, _host_database) Timing_Data.load(host_infos, _log_database, build_context.sessions_structure) } private val scheduler = init_scheduler(timing_data) def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = { val sessions = for { (session_name, result) <- state.toList if !result.current } yield { val info = build_context.sessions_structure(session_name) val entry = if (!results.cancelled(session_name)) { val status = if (result.ok) Build_Log.Session_Status.finished else Build_Log.Session_Status.failed Build_Log.Session_Entry( chapter = info.chapter, groups = info.groups, hostname = Some(result.node_info.hostname), threads = Some(timing_data.host_infos.num_threads(result.node_info)), start = Some(result.start_date - build_start), timing = result.process_result.timing, sources = Some(result.output_shasum.digest.toString), status = Some(status)) } else Build_Log.Session_Entry( chapter = info.chapter, groups = info.groups, status = Some(Build_Log.Session_Status.cancelled)) session_name -> entry } val settings = Build_Log.Settings.all_settings.map(_.name).map(name => name -> Isabelle_System.getenv(name)) val props = List( Build_Log.Prop.build_id.name -> build_context.build_uuid, Build_Log.Prop.build_engine.name -> build_context.engine.name, Build_Log.Prop.build_host.name -> hostname, Build_Log.Prop.build_start.name -> Build_Log.print_date(build_start)) val meta_info = Build_Log.Meta_Info(props, settings) val build_info = Build_Log.Build_Info(sessions.toMap) val log_name = Build_Log.log_filename(engine = build_context.engine.name, date = build_start) Build_Log.private_data.update_sessions( _log_database, _log_store.cache.compress, log_name.file_name, build_info) Build_Log.private_data.update_meta_info(_log_database, log_name.file_name, meta_info) } /* build process */ def is_current(state: Build_Process.State, session_name: String): Boolean = state.ancestor_results(session_name) match { case Some(ancestor_results) if ancestor_results.forall(_.current) => - val sources_shasum = state.sessions(session_name).sources_shasum - - val input_shasum = - if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() - else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) - - val store_heap = - build_context.build_heap || Sessions.is_pure(session_name) || - state.sessions.iterator.exists(_.ancestors.contains(session_name)) - store.check_output( _database_server, session_name, session_options = build_context.sessions_structure(session_name).options, - sources_shasum = sources_shasum, - input_shasum = input_shasum, + sources_shasum = state.sessions(session_name).sources_shasum, + input_shasum = ML_Process.make_shasum(ancestor_results.map(_.output_shasum)), fresh_build = build_context.fresh_build, - store_heap = store_heap)._1 + store_heap = build_context.store_heap || state.sessions.store_heap(session_name))._1 case _ => false } override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped) state.next_ready.map(_.name) else if (!_schedule.is_outdated(build_options, state)) _schedule.next(hostname, state) else { val current = state.next_ready.filter(task => is_current(state, task.name)) if (current.nonEmpty) current.map(_.name) else { val start = Time.now() val new_schedule = scheduler.schedule(state).update(state) val schedule = if (_schedule.is_empty) new_schedule else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering) val elapsed = Time.now() - start val timing_msg = if (elapsed.is_relevant) " (took " + elapsed.message + ")" else "" progress.echo_if( _schedule.deviation(schedule).minutes > 1 && schedule.duration >= Time.seconds(1), schedule.message + timing_msg) _schedule = schedule _schedule.next(hostname, state) } } override def run(): Build.Results = { val vacuous = synchronized_database("Scheduler_Build_Process.init") { for (db <- _build_database) Build_Process.private_data.clean_build(db) init_unsynchronized() _state.pending.isEmpty } if (vacuous) { progress.echo_warning("Nothing to build") stop_build() Build.Results(build_context) } else { start_worker() _build_cluster.start() try { while (!finished()) { synchronized_database("Scheduler_Build_Process.main") { if (progress.stopped) _state.build_running.foreach(_.cancel()) main_unsynchronized() for { host <- build_context.build_hosts if _schedule.exists_next(host.name, _state) } build_send(Build_Schedule.private_data.channel_ready(host.name)) } while (!build_action()) {} } } finally { _build_cluster.stop() stop_worker() stop_build() } val results = synchronized_database("Scheduler_Build_Process.result") { val results = for ((name, result) <- _state.results) yield name -> result.process_result Build.Results(build_context, results = results, other_rc = _build_cluster.rc) } write_build_log(results, _state.results) results } } } /** SQL data model of build schedule, extending isabelle_build database */ object private_data extends SQL.Data("isabelle_build") { import Build_Process.private_data.{Base, Generic} /* tables */ override lazy val tables: SQL.Tables = SQL.Tables(Schedules.table, Nodes.table) lazy val all_tables: SQL.Tables = SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list) /* notifications */ def channel_ready(hostname: String): SQL.Notification = SQL.Notification(Build_Process.private_data.channel, payload = hostname) /* schedule */ object Schedules { val build_uuid = Generic.build_uuid.make_primary_key val generator = SQL.Column.string("generator") val start = SQL.Column.date("start") val serial = SQL.Column.long("serial") val table = make_table(List(build_uuid, generator, start, serial), name = "schedules") } def read_serial(db: SQL.Database, build_uuid: String = ""): Long = db.execute_query_statementO[Long]( Schedules.table.select(List(Schedules.serial.max), sql = SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))), _.long(Schedules.serial)).getOrElse(0L) def read_scheduled_builds_domain(db: SQL.Database): Map[String, Unit] = db.execute_query_statement( Schedules.table.select(List(Schedules.build_uuid)), Map.from[String, Unit], res => res.string(Schedules.build_uuid) -> ()) def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = { val schedules = db.execute_query_statement(Schedules.table.select(sql = SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))), List.from[Schedule], { res => val build_uuid = res.string(Schedules.build_uuid) val generator = res.string(Schedules.generator) val start = res.date(Schedules.start) val serial = res.long(Schedules.serial) Schedule(build_uuid, generator, start, Graph.empty, serial) }) for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield { val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid) schedule.copy(graph = Graph.make(nodes)) } } def write_schedule(db: SQL.Database, schedule: Schedule): Unit = { db.execute_statement( Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid))) db.execute_statement(Schedules.table.insert(), { stmt => stmt.string(1) = schedule.build_uuid stmt.string(2) = schedule.generator stmt.date(3) = schedule.start stmt.long(4) = schedule.serial }) update_nodes(db, schedule.build_uuid, schedule.graph.dest) } /* nodes */ object Nodes { val build_uuid = Generic.build_uuid.make_primary_key val name = Generic.name.make_primary_key val succs = SQL.Column.string("succs") val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start = SQL.Column.date("start") val duration = SQL.Column.long("duration") val table = make_table( List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration), name = "schedule_nodes") } type Nodes = List[((String, Schedule.Node), List[String])] def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = { db.execute_query_statement( Nodes.table.select(sql = SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))), List.from[((String, Schedule.Node), List[String])], { res => val name = res.string(Nodes.name) val succs = split_lines(res.string(Nodes.succs)) val hostname = res.string(Nodes.hostname) val numa_node = res.get_int(Nodes.numa_node) val rel_cpus = res.string(Nodes.rel_cpus) val start = res.date(Nodes.start) val duration = Time.ms(res.long(Nodes.duration)) val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus)) ((name, Schedule.Node(name, node_info, start, duration)), succs) } ) } def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = { db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid))) db.execute_batch_statement(Nodes.table.insert(), batch = for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) => stmt.string(1) = build_uuid stmt.string(2) = name stmt.string(3) = cat_lines(succs) stmt.string(4) = node.node_info.hostname stmt.int(5) = node.node_info.numa_node stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus) stmt.date(7) = node.start stmt.long(8) = node.duration.ms }) } def pull_schedule(db: SQL.Database, old_schedule: Schedule): Build_Schedule.Schedule = { val serial_db = read_serial(db) if (serial_db == old_schedule.serial) old_schedule else { read_schedules(db, old_schedule.build_uuid) match { case Nil => old_schedule case schedules => Library.the_single(schedules) } } } def pull_schedule(db: SQL.Database, schedule: Schedule, old_schedule: Schedule): Schedule = { val changed = schedule.generator != old_schedule.generator || schedule.start != old_schedule.start || schedule.graph != old_schedule.graph val schedule1 = if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule if (schedule1.serial != schedule.serial) write_schedule(db, schedule1) schedule1 } def remove_schedules(db: SQL.Database, remove: List[String]): Unit = if (remove.nonEmpty) { val sql = Generic.build_uuid.where_member(remove) db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql)))) } def clean_build_schedules(db: SQL.Database): Unit = { val running_builds_domain = db.execute_query_statement( Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)), Map.from[String, Unit], res => res.string(Base.build_uuid) -> ()) val update = Library.Update.make(read_scheduled_builds_domain(db), running_builds_domain) remove_schedules(db, update.delete) } } class Build_Engine extends Build.Engine("build_schedule") { override def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = super.build_options(options, build_cluster = build_cluster) if (build_cluster) options1 + "build_database_server" else options1 } def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = { val sessions_structure = context.sessions_structure val is_criticals = List( Path_Time_Heuristic.Absolute_Time(Time.minutes(5)), Path_Time_Heuristic.Absolute_Time(Time.minutes(10)), Path_Time_Heuristic.Absolute_Time(Time.minutes(20)), Path_Time_Heuristic.Relative_Time(0.5)) val parallel_threads = List( Path_Time_Heuristic.Fixed_Thread(1), Path_Time_Heuristic.Time_Based_Threads({ case time if time < Time.minutes(1) => 1 case time if time < Time.minutes(5) => 4 case _ => 8 })) val machine_splits = List( Path_Time_Heuristic.Critical_Nodes, Path_Time_Heuristic.Fixed_Fraction(0.3), Path_Time_Heuristic.Host_Speed(0.9)) val path_time_heuristics = for { is_critical <- is_criticals parallel <- parallel_threads machine_split <- machine_splits } yield Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) val default_heuristic = Default_Heuristic(timing_data.host_infos) val heuristics = default_heuristic :: path_time_heuristics val initial_schedule_file = context.build_options.string("build_schedule_initial") val initial = proper_string(initial_schedule_file).toList.map(initial_schedule_file => Schedule.read(Path.explode(initial_schedule_file)).copy(build_uuid = context.build_uuid)) Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid)), initial) } override def open_build_process( context: Build.Context, progress: Progress, server: SSH.Server ): Build_Process = if (!context.master) new Scheduled_Build_Process(context, progress, server) else { val schedule_file = context.build_options.string("build_schedule") if (schedule_file.isEmpty) { new Scheduler_Build_Process(context, progress, server) { def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context) } } else { val finished_schedule = Schedule.read(Path.explode(schedule_file)).copy(build_uuid = context.build_uuid) new Scheduler_Build_Process(context, progress, server) { def init_scheduler(timing_data: Timing_Data): Scheduler = (build_state: Build_Process.State) => finished_schedule } } } } object Build_Engine extends Build_Engine /* build schedule */ def build_schedule( options: Options, build_hosts: List[Build_Cluster.Host] = Nil, selection: Sessions.Selection = Sessions.Selection.empty, progress: Progress = new Progress, afp_root: Option[Path] = None, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, augment_options: String => List[Options.Spec] = _ => Nil, session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Schedule = { Build.build_process(options, build_cluster = true, remove_builds = true) val store = Build_Engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache) val log_store = Build_Log.store(options, cache = cache) val build_options = store.options def main( server: SSH.Server, database_server: Option[SQL.Database], log_database: PostgreSQL.Database, host_database: SQL.Database ): Schedule = { val full_sessions = Sessions.load_structure(build_options, dirs = AFP.main_dirs(afp_root) ::: dirs, select_dirs = select_dirs, infos = infos, augment_options = augment_options) val build_deps = Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true).check_errors val build_context = Build.Context(store, build_deps, engine = Build_Engine, afp_root = afp_root, build_hosts = build_hosts, hostname = Build.hostname(build_options), numa_shuffling = numa_shuffling, session_setup = session_setup, master = true) val cluster_hosts = build_context.build_hosts val hosts_current = cluster_hosts.forall(host => isabelle.Host.read_info(host_database, host.name).isDefined) if (!hosts_current) { using(Build_Cluster.make(build_context, progress = progress).open())(_.init().benchmark()) } val host_infos = Host_Infos.load(build_options, cluster_hosts, host_database) val timing_data = Timing_Data.load(host_infos, log_database, full_sessions) val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress) val build_state = Build_Process.State(sessions = sessions, pending = Map.from(sessions.iterator.map(Build_Process.Task.entry(_, build_context)))) val scheduler = Build_Engine.scheduler(timing_data, build_context) def schedule_msg(res: Exn.Result[Schedule]): String = res match { case Exn.Res(schedule) => schedule.message case _ => "" } progress.echo("Building schedule...") Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_)) } using(store.open_server()) { server => using_optional(store.maybe_open_database_server(server = server)) { database_server => using(log_store.open_database(server = server)) { log_database => using(store.open_build_database( path = isabelle.Host.private_data.database, server = server)) { host_database => main(server, database_server, log_database, host_database) } } } } } def write_schedule_graphic(schedule: Schedule, output: Path): Unit = { import java.awt.geom.{GeneralPath, Rectangle2D} import java.awt.{BasicStroke, Color, Graphics2D} val line_height = isabelle.graphview.Metrics.default.height val char_width = isabelle.graphview.Metrics.default.char_width val padding = isabelle.graphview.Metrics.default.space_width val gap = isabelle.graphview.Metrics.default.gap val graph = schedule.graph def text_width(text: String): Double = text.length * char_width val generator_height = line_height + padding val hostname_height = generator_height + line_height + padding def time_height(time: Time): Double = time.seconds def date_height(date: Date): Double = time_height(date - schedule.start) val hosts = graph.iterator.map(_._2._1).toList.groupBy(_.node_info.hostname) def node_width(node: Schedule.Node): Double = 2 * padding + text_width(node.job_name) case class Range(start: Double, stop: Double) { def proper: List[Range] = if (start < stop) List(this) else Nil def width: Double = stop - start } val rel_node_ranges = hosts.toList.flatMap { (hostname, nodes) => val sorted = nodes.sortBy(node => (node.start.time.ms, node.end.time.ms, node.job_name)) sorted.foldLeft((List.empty[Schedule.Node], Map.empty[Schedule.Node, Range])) { case ((nodes, allocated), node) => val width = node_width(node) + padding val parallel = nodes.filter(_.end.time > node.start.time) val (last, slots) = parallel.sortBy(allocated(_).start).foldLeft((0D, List.empty[Range])) { case ((start, ranges), node1) => val node_range = allocated(node1) (node_range.stop, ranges ::: Range(start, node_range.start).proper) } val start = (Range(last, Double.MaxValue) :: slots.filter(_.width >= width)).minBy(_.width).start (node :: parallel, allocated + (node -> Range(start, start + width))) }._2 }.toMap def host_width(hostname: String) = 2 * padding + (hosts(hostname).map(rel_node_ranges(_).stop).max max text_width(hostname)) def graph_height(graph: Graph[String, Schedule.Node]): Double = date_height(graph.maximals.map(graph.get_node(_).end).maxBy(_.unix_epoch)) val height = (hostname_height + 2 * padding + graph_height(graph)).ceil.toInt val (last, host_starts) = hosts.keys.foldLeft((0D, Map.empty[String, Double])) { case ((previous, starts), hostname) => (previous + gap + host_width(hostname), starts + (hostname -> previous)) } val width = (last - gap).ceil.toInt def node_start(node: Schedule.Node): Double = host_starts(node.node_info.hostname) + padding + rel_node_ranges(node).start def paint(gfx: Graphics2D): Unit = { gfx.setColor(Color.LIGHT_GRAY) gfx.fillRect(0, 0, width, height) gfx.setRenderingHints(isabelle.graphview.Metrics.rendering_hints) gfx.setFont(isabelle.graphview.Metrics.default.font) gfx.setStroke(new BasicStroke(1, BasicStroke.CAP_BUTT, BasicStroke.JOIN_ROUND)) draw_string(schedule.generator + ", build time: " + schedule.duration.message_hms, padding, 0) def draw_host(x: Double, hostname: String): Double = { val nodes = hosts(hostname).map(_.job_name).toSet val width = host_width(hostname) val height = 2 * padding + graph_height(graph.restrict(nodes.contains)) val padding1 = ((width - text_width(hostname)) / 2) max 0 val rect = new Rectangle2D.Double(x, hostname_height, width, height) gfx.setColor(Color.BLACK) gfx.draw(rect) gfx.setColor(Color.GRAY) gfx.fill(rect) draw_string(hostname, x + padding1, generator_height) x + gap + width } def draw_string(str: String, x: Double, y: Double): Unit = { gfx.setColor(Color.BLACK) gfx.drawString(str, x.toInt, (y + line_height).toInt) } def node_rect(node: Schedule.Node): Rectangle2D.Double = { val x = node_start(node) val y = hostname_height + padding + date_height(node.start) val width = node_width(node) val height = time_height(node.duration) new Rectangle2D.Double(x, y, width, height) } def draw_node(node: Schedule.Node): Rectangle2D.Double = { val rect = node_rect(node) gfx.setColor(Color.BLACK) gfx.draw(rect) gfx.setColor(Color.WHITE) gfx.fill(rect) def add_text(y: Double, text: String): Double = if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y else { val padding1 = padding min ((rect.height - (y + line_height)) / 2) draw_string(text, rect.x + padding, rect.y + y + padding1) y + padding1 + line_height } val node_info = node.node_info val duration_str = "(" + node.duration.message_hms + ")" val node_str = "on " + proper_string(node_info.toString.stripPrefix(node_info.hostname)).getOrElse("all") val start_str = "Start: " + (node.start - schedule.start).message_hms List(node.job_name, duration_str, node_str, start_str).foldLeft(0D)(add_text) rect } def draw_arrow(from: Schedule.Node, to: Rectangle2D.Double, curve: Double = 10): Unit = { val from_rect = node_rect(from) val path = new GeneralPath() path.moveTo(from_rect.getCenterX, from_rect.getMaxY) path.lineTo(to.getCenterX, to.getMinY) gfx.setColor(Color.BLUE) gfx.draw(path) } hosts.keys.foldLeft(0D)(draw_host) graph.topological_order.foreach { job_name => val node = graph.get_node(job_name) val rect = draw_node(node) for { pred <- graph.imm_preds(job_name).iterator pred_node = graph.get_node(pred) if node.node_info.hostname != pred_node.node_info.hostname } draw_arrow(pred_node, rect) } } val name = output.file_name if (File.is_png(name)) Graphics_File.write_png(output.file, paint, width, height) else if (File.is_pdf(name)) Graphics_File.write_pdf(output.file, paint, width, height) else error("Bad type of file: " + quote(name) + " (.png or .pdf expected)") } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("build_schedule", "generate build schedule", Scala_Project.here, { args => var afp_root: Option[Path] = None val base_sessions = new mutable.ListBuffer[String] val select_dirs = new mutable.ListBuffer[Path] val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] var numa_shuffling = false var output_file: Option[Path] = None var requirements = false val exclude_session_groups = new mutable.ListBuffer[String] var all_sessions = false val dirs = new mutable.ListBuffer[Path] val session_groups = new mutable.ListBuffer[String] var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS) var verbose = false val exclude_sessions = new mutable.ListBuffer[String] val getopts = Getopts(""" Usage: isabelle build_schedule [OPTIONS] [SESSIONS ...] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -O FILE output file (pdf or png for image, else yxml) -R refer to requirements of selected sessions -X NAME exclude sessions from group NAME and all descendants -a select all sessions -d DIR include session directory -g NAME select session group NAME -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose -x NAME exclude session NAME and all descendants Generate build schedule, but do not run actual build. """, "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => base_sessions += arg), "D:" -> (arg => select_dirs += Path.explode(arg)), "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)), "N" -> (_ => numa_shuffling = true), "O:" -> (arg => output_file = Some(Path.explode(arg))), "R" -> (_ => requirements = true), "X:" -> (arg => exclude_session_groups += arg), "a" -> (_ => all_sessions = true), "d:" -> (arg => dirs += Path.explode(arg)), "g:" -> (arg => session_groups += arg), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions += arg)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) val schedule = build_schedule(options, selection = Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions.toList, exclude_session_groups = exclude_session_groups.toList, exclude_sessions = exclude_sessions.toList, session_groups = session_groups.toList, sessions = sessions), progress = progress, afp_root = afp_root, dirs = dirs.toList, select_dirs = select_dirs.toList, numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling), build_hosts = build_hosts.toList) output_file match { case Some(output_file) if !schedule.is_empty => if (File.is_pdf(output_file.file_name) || File.is_png(output_file.file_name)) write_schedule_graphic(schedule, output_file) else Schedule.write(schedule, output_file) case _ => } }) } diff --git a/src/Pure/Build/store.scala b/src/Pure/Build/store.scala --- a/src/Pure/Build/store.scala +++ b/src/Pure/Build/store.scala @@ -1,580 +1,603 @@ /* Title: Pure/Build/store.scala Author: Makarius Persistent store for session content: within file-system and/or SQL database. */ package isabelle import java.sql.SQLException object Store { def apply( options: Options, build_cluster: Boolean = false, cache: Term.Cache = Term.Cache.make() ): Store = new Store(options, build_cluster, cache) /* file names */ def heap(name: String): Path = Path.basic(name) def log(name: String): Path = Path.basic("log") + Path.basic(name) def log_db(name: String): Path = log(name).db def log_gz(name: String): Path = log(name).gz /* session */ final class Session private[Store]( val name: String, val heap: Option[Path], val log_db: Option[Path], dirs: List[Path] ) { def log_db_name: String = Store.log_db(name).implode def defined: Boolean = heap.isDefined || log_db.isDefined def the_heap: Path = heap getOrElse error("Missing heap image for session " + quote(name) + " -- expected in:\n" + cat_lines(dirs.map(dir => " " + File.standard_path(dir)))) def heap_digest(): Option[SHA1.Digest] = heap.flatMap(ML_Heap.read_file_digest) override def toString: String = name } /* session build info */ sealed case class Build_Info( sources: SHA1.Shasum, input_heaps: SHA1.Shasum, output_heap: SHA1.Shasum, return_code: Int, uuid: String ) { def ok: Boolean = return_code == 0 } /* session sources */ sealed case class Source_File( name: String, digest: SHA1.Digest, compressed: Boolean, body: Bytes, cache: Compress.Cache ) { override def toString: String = name def bytes: Bytes = if (compressed) body.uncompress(cache = cache) else body } object Sources { def load(session_base: Sessions.Base, cache: Compress.Cache = Compress.Cache.none): Sources = new Sources( session_base.session_sources.foldLeft(Map.empty) { case (sources, (path, digest)) => - def err(): Nothing = error("Incoherent digest for source file: " + path) + def err(): Nothing = error("Incoherent digest for source file: " + path.expand) val name = File.symbolic_path(path) sources.get(name) match { case Some(source_file) => if (source_file.digest == digest) sources else err() case None => val bytes = Bytes.read(path) if (bytes.sha1_digest == digest) { val (compressed, body) = bytes.maybe_compress(Compress.Options_Zstd(), cache = cache) val file = Source_File(name, digest, compressed, body, cache) sources + (name -> file) } else err() } }) } class Sources private(rep: Map[String, Source_File]) extends Iterable[Source_File] { override def toString: String = rep.values.toList.sortBy(_.name).mkString("Sources(", ", ", ")") override def iterator: Iterator[Source_File] = rep.valuesIterator def get(name: String): Option[Source_File] = rep.get(name) def apply(name: String): Source_File = get(name).getOrElse(error("Missing session sources entry " + quote(name))) } /* SQL data model */ object private_data extends SQL.Data() { override lazy val tables: SQL.Tables = SQL.Tables(Session_Info.table, Sources.table) object Session_Info { val session_name = SQL.Column.string("session_name").make_primary_key // Build_Log.Session_Info val session_timing = SQL.Column.bytes("session_timing") val command_timings = SQL.Column.bytes("command_timings") val theory_timings = SQL.Column.bytes("theory_timings") val ml_statistics = SQL.Column.bytes("ml_statistics") val task_statistics = SQL.Column.bytes("task_statistics") val errors = SQL.Column.bytes("errors") val build_log_columns = List(session_name, session_timing, command_timings, theory_timings, ml_statistics, task_statistics, errors) // Build_Info val sources = SQL.Column.string("sources") val input_heaps = SQL.Column.string("input_heaps") val output_heap = SQL.Column.string("output_heap") val return_code = SQL.Column.int("return_code") val uuid = SQL.Column.string("uuid") val build_columns = List(sources, input_heaps, output_heap, return_code, uuid) val table = SQL.Table("isabelle_session_info", build_log_columns ::: build_columns) } object Sources { val session_name = SQL.Column.string("session_name").make_primary_key val name = SQL.Column.string("name").make_primary_key val digest = SQL.Column.string("digest") val compressed = SQL.Column.bool("compressed") val body = SQL.Column.bytes("body") val table = SQL.Table("isabelle_sources", List(session_name, name, digest, compressed, body)) def where_equal(session_name: String, name: String = ""): SQL.Source = SQL.where_and( Sources.session_name.equal(session_name), if_proper(name, Sources.name.equal(name))) } def read_bytes(db: SQL.Database, name: String, column: SQL.Column): Bytes = db.execute_query_statementO[Bytes]( Session_Info.table.select(List(column), sql = Session_Info.session_name.where_equal(name)), res => res.bytes(column) ).getOrElse(Bytes.empty) def read_properties( db: SQL.Database, name: String, column: SQL.Column, cache: Term.Cache ): List[Properties.T] = Properties.uncompress(read_bytes(db, name, column), cache = cache) def read_session_timing(db: SQL.Database, name: String, cache: Term.Cache): Properties.T = Properties.decode(read_bytes(db, name, Session_Info.session_timing), cache = cache) def read_command_timings(db: SQL.Database, name: String): Bytes = read_bytes(db, name, Session_Info.command_timings) def read_theory_timings(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.theory_timings, cache) def read_ml_statistics(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.ml_statistics, cache) def read_task_statistics(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.task_statistics, cache) def read_errors(db: SQL.Database, name: String, cache: Term.Cache): List[String] = Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache) def read_build(db: SQL.Database, name: String): Option[Store.Build_Info] = db.execute_query_statementO[Store.Build_Info]( Session_Info.table.select(sql = Session_Info.session_name.where_equal(name)), { res => val uuid = try { Option(res.string(Session_Info.uuid)).getOrElse("") } catch { case _: SQLException => "" } Store.Build_Info( SHA1.fake_shasum(res.string(Session_Info.sources)), SHA1.fake_shasum(res.string(Session_Info.input_heaps)), SHA1.fake_shasum(res.string(Session_Info.output_heap)), res.int(Session_Info.return_code), uuid) }) def read_build_uuid(db: SQL.Database, name: String): String = db.execute_query_statementO[String]( Session_Info.table.select(List(Session_Info.uuid), sql = Session_Info.session_name.where_equal(name)), { res => try { Option(res.string(Session_Info.uuid)).getOrElse("") } catch { case _: SQLException => "" } }).getOrElse("") def write_session_info( db: SQL.Database, cache: Compress.Cache, session_name: String, build_log: Build_Log.Session_Info, build: Build_Info ): Unit = { db.execute_statement(Session_Info.table.insert(), body = { stmt => stmt.string(1) = session_name stmt.bytes(2) = Properties.encode(build_log.session_timing) stmt.bytes(3) = Properties.compress(build_log.command_timings, cache = cache) stmt.bytes(4) = Properties.compress(build_log.theory_timings, cache = cache) stmt.bytes(5) = Properties.compress(build_log.ml_statistics, cache = cache) stmt.bytes(6) = Properties.compress(build_log.task_statistics, cache = cache) stmt.bytes(7) = Build_Log.compress_errors(build_log.errors, cache = cache) stmt.string(8) = build.sources.toString stmt.string(9) = build.input_heaps.toString stmt.string(10) = build.output_heap.toString stmt.int(11) = build.return_code stmt.string(12) = build.uuid }) } def write_sources( db: SQL.Database, session_name: String, source_files: Iterable[Source_File] ): Unit = { db.execute_batch_statement(Sources.table.insert(), batch = for (source_file <- source_files) yield { (stmt: SQL.Statement) => stmt.string(1) = session_name stmt.string(2) = source_file.name stmt.string(3) = source_file.digest.toString stmt.bool(4) = source_file.compressed stmt.bytes(5) = source_file.body }) } def read_sources( db: SQL.Database, session_name: String, name: String, cache: Compress.Cache ): List[Source_File] = { db.execute_query_statement( Sources.table.select( sql = Sources.where_equal(session_name, name = name) + SQL.order_by(List(Sources.name))), List.from[Source_File], { res => val res_name = res.string(Sources.name) val digest = SHA1.fake_digest(res.string(Sources.digest)) val compressed = res.bool(Sources.compressed) val body = res.bytes(Sources.body) Source_File(res_name, digest, compressed, body, cache) } ) } } def read_build_uuid(path: Path, session: String): String = try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) } catch { case _: SQLException => "" } } class Store private( val options: Options, val build_cluster: Boolean, val cache: Term.Cache ) { store => override def toString: String = "Store(output_dir = " + output_dir.absolute + ")" /* directories */ val system_output_dir: Path = Path.explode("$ISABELLE_HEAPS_SYSTEM/$ML_IDENTIFIER") val user_output_dir: Path = Path.explode("$ISABELLE_HEAPS/$ML_IDENTIFIER") def system_heaps: Boolean = options.bool("system_heaps") val output_dir: Path = if (system_heaps) system_output_dir else user_output_dir val input_dirs: List[Path] = if (system_heaps) List(system_output_dir) else List(user_output_dir, system_output_dir) val clean_dirs: List[Path] = if (system_heaps) List(user_output_dir, system_output_dir) else List(user_output_dir) def presentation_dir: Path = if (system_heaps) Path.explode("$ISABELLE_BROWSER_INFO_SYSTEM") else Path.explode("$ISABELLE_BROWSER_INFO") /* file names */ def output_heap(name: String): Path = output_dir + Store.heap(name) def output_log(name: String): Path = output_dir + Store.log(name) def output_log_db(name: String): Path = output_dir + Store.log_db(name) def output_log_gz(name: String): Path = output_dir + Store.log_gz(name) /* session */ def get_session(name: String): Store.Session = { val heap = input_dirs.view.map(_ + Store.heap(name)).find(_.is_file) val log_db = input_dirs.view.map(_ + Store.log_db(name)).find(_.is_file) new Store.Session(name, heap, log_db, input_dirs) } def output_session(name: String, store_heap: Boolean = false): Store.Session = { val heap = if (store_heap) Some(output_heap(name)) else None val log_db = if (!build_database_server) Some(output_log_db(name)) else None new Store.Session(name, heap, log_db, List(output_dir)) } /* heap */ def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = { def get_database: Option[SHA1.Digest] = { for { db <- database_server digest <- ML_Heap.read_digests(db, List(name)).valuesIterator.nextOption() } yield digest } get_database orElse get_session(name).heap_digest() match { case Some(digest) => SHA1.shasum(digest, name) case None => SHA1.no_shasum } } /* databases for build process and session content */ def build_database_server: Boolean = options.bool("build_database_server") def build_database: Boolean = options.bool("build_database") def open_server(): SSH.Server = PostgreSQL.open_server(options, host = options.string("build_database_host"), port = options.int("build_database_port"), ssh_host = options.string("build_database_ssh_host"), ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) def open_database_server(server: SSH.Server = SSH.no_server): PostgreSQL.Database = PostgreSQL.open_database_server(options, server = server, user = options.string("build_database_user"), password = options.string("build_database_password"), database = options.string("build_database_name"), host = options.string("build_database_host"), port = options.int("build_database_port"), ssh_host = options.string("build_database_ssh_host"), ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) def maybe_open_database_server( server: SSH.Server = SSH.no_server, guard: Boolean = build_database_server ): Option[SQL.Database] = { if (guard) Some(open_database_server(server = server)) else None } def maybe_open_heaps_database( database_server: Option[SQL.Database], server: SSH.Server = SSH.no_server ): Option[SQL.Database] = { if (database_server.isDefined) None - else store.maybe_open_database_server(server = server, guard = build_cluster) + else maybe_open_database_server(server = server, guard = build_cluster) + } + + def maybe_using_heaps_database[A]( + database_server: Option[SQL.Database], + server: SSH.Server = SSH.no_server + )(f: SQL.Database => A): Option[A] = { + using_optional(maybe_open_heaps_database(database_server, server = server)) { + heaps_database => (database_server orElse heaps_database).map(f) + } + } + + def in_heaps_database( + sessions: List[Store.Session], + database_server: Option[SQL.Database], + server: SSH.Server = SSH.no_server, + progress: Progress = new Progress + ): Unit = { + if (sessions.nonEmpty) { + maybe_using_heaps_database(database_server, server = server) { db => + val slice = Space.MiB(options.real("build_database_slice")) + sessions.foreach(ML_Heap.store(db, _, slice, cache = cache.compress, progress = progress)) + } + } } def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database = if (build_database_server || build_cluster) open_database_server(server = server) else SQLite.open_database(path, restrict = true) def maybe_open_build_database( path: Path = Path.explode("$ISABELLE_HOME_USER/build.db"), server: SSH.Server = SSH.no_server ): Option[SQL.Database] = { if (build_database) Some(open_build_database(path, server = server)) else None } def try_open_database( name: String, output: Boolean = false, server: SSH.Server = SSH.no_server, server_mode: Boolean = build_database_server ): Option[SQL.Database] = { def check(db: SQL.Database): Option[SQL.Database] = if (output || session_info_exists(db)) Some(db) else { db.close(); None } if (server_mode) check(open_database_server(server = server)) else if (output) Some(SQLite.open_database(output_log_db(name))) else { (for { dir <- input_dirs.view path = dir + Store.log_db(name) if path.is_file db <- check(SQLite.open_database(path)) } yield db).headOption } } def error_database(name: String): Nothing = error("Missing build database for session " + quote(name)) def open_database( name: String, output: Boolean = false, server: SSH.Server = SSH.no_server ): SQL.Database = { try_open_database(name, output = output, server = server) getOrElse error_database(name) } def clean_output( database_server: Option[SQL.Database], name: String, session_init: Boolean = false, progress: Progress = new Progress ): Unit = { val relevant_db = database_server match { case Some(db) => ML_Heap.clean_entry(db, name) clean_session_info(db, name) case None => false } val del = for { dir <- clean_dirs file <- List(Store.heap(name), Store.log_db(name), Store.log(name), Store.log_gz(name)) path = dir + file if path.is_file } yield path.file.delete if (database_server.isEmpty && session_init) { using(open_database(name, output = true))(clean_session_info(_, name)) } if (relevant_db || del.nonEmpty) { if (del.forall(identity)) progress.echo("Cleaned " + name) else progress.echo(name + " FAILED to clean") } } def check_output( database_server: Option[SQL.Database], name: String, session_options: Options, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, - fresh_build: Boolean, - store_heap: Boolean + fresh_build: Boolean = false, + store_heap: Boolean = false ): (Boolean, SHA1.Shasum) = { def no_check: (Boolean, SHA1.Shasum) = (false, SHA1.no_shasum) def check(db: SQL.Database): (Boolean, SHA1.Shasum) = read_build(db, name) match { case Some(build) => val output_shasum = heap_shasum(if (db.is_postgresql) Some(db) else None, name) val current = !fresh_build && build.ok && Sessions.eq_sources(session_options, build.sources, sources_shasum) && build.input_heaps == input_shasum && build.output_heap == output_shasum && !(store_heap && output_shasum.is_empty) (current, output_shasum) case None => no_check } database_server match { case Some(db) => if (session_info_exists(db)) check(db) else no_check case None => using_option(try_open_database(name))(check) getOrElse no_check } } /* session info */ def session_info_exists(db: SQL.Database): Boolean = Store.private_data.tables.forall(db.exists_table) def session_info_defined(db: SQL.Database, name: String): Boolean = db.execute_query_statementB( Store.private_data.Session_Info.table.select(List(Store.private_data.Session_Info.session_name), sql = Store.private_data.Session_Info.session_name.where_equal(name))) def clean_session_info(db: SQL.Database, name: String): Boolean = { Export.clean_session(db, name) Document_Build.clean_session(db, name) Store.private_data.transaction_lock(db, create = true, label = "Store.clean_session_info") { val already_defined = session_info_defined(db, name) db.execute_statement( SQL.multi( Store.private_data.Session_Info.table.delete( sql = Store.private_data.Session_Info.session_name.where_equal(name)), Store.private_data.Sources.table.delete( sql = Store.private_data.Sources.where_equal(name)))) already_defined } } def write_session_info( db: SQL.Database, session_name: String, sources: Store.Sources, build_log: Build_Log.Session_Info, build: Store.Build_Info ): Unit = { Store.private_data.transaction_lock(db, label = "Store.write_session_info") { for (source_files <- sources.iterator.toList.grouped(200)) { Store.private_data.write_sources(db, session_name, source_files) } Store.private_data.write_session_info(db, cache.compress, session_name, build_log, build) } } def read_session_timing(db: SQL.Database, session: String): Properties.T = Store.private_data.transaction_lock(db, label = "Store.read_session_timing") { Store.private_data.read_session_timing(db, session, cache) } def read_command_timings(db: SQL.Database, session: String): Bytes = Store.private_data.transaction_lock(db, label = "Store.read_command_timings") { Store.private_data.read_command_timings(db, session) } def read_theory_timings(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_theory_timings") { Store.private_data.read_theory_timings(db, session, cache) } def read_ml_statistics(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_ml_statistics") { Store.private_data.read_ml_statistics(db, session, cache) } def read_task_statistics(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_task_statistics") { Store.private_data.read_task_statistics(db, session, cache) } def read_theories(db: SQL.Database, session: String): List[String] = read_theory_timings(db, session).flatMap(Markup.Name.unapply) def read_errors(db: SQL.Database, session: String): List[String] = Store.private_data.transaction_lock(db, label = "Store.read_errors") { Store.private_data.read_errors(db, session, cache) } def read_build(db: SQL.Database, session: String): Option[Store.Build_Info] = Store.private_data.transaction_lock(db, label = "Store.read_build") { if (session_info_exists(db)) Store.private_data.read_build(db, session) else None } def read_sources(db: SQL.Database, session: String, name: String = ""): List[Store.Source_File] = Store.private_data.transaction_lock(db, label = "Store.read_sources") { Store.private_data.read_sources(db, session, name, cache.compress) } } diff --git a/src/Pure/ML/ml_heap.scala b/src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala +++ b/src/Pure/ML/ml_heap.scala @@ -1,319 +1,314 @@ /* Title: Pure/ML/ml_heap.scala Author: Makarius ML heap operations. */ package isabelle object ML_Heap { /** heap file with SHA1 digest **/ private val sha1_prefix = "SHA1:" + private val sha1_length = sha1_prefix.length + SHA1.digest_length def read_file_digest(heap: Path): Option[SHA1.Digest] = { if (heap.is_file) { - val l = sha1_prefix.length - val m = l + SHA1.digest_length - val n = File.size(heap) - val bs = Bytes.read_file(heap, offset = n - m) - if (bs.length == m) { + val bs = Bytes.read_file(heap, offset = File.size(heap) - sha1_length) + if (bs.length == sha1_length) { val s = bs.text - if (s.startsWith(sha1_prefix)) Some(SHA1.fake_digest(s.substring(l))) + if (s.startsWith(sha1_prefix)) Some(SHA1.fake_digest(s.substring(sha1_prefix.length))) else None } else None } else None } def write_file_digest(heap: Path): SHA1.Digest = read_file_digest(heap) getOrElse { val digest = SHA1.digest(heap) File.append(heap, sha1_prefix + digest.toString) digest } /* SQL data model */ sealed case class Log_DB(uuid: String, content: Bytes) object private_data extends SQL.Data("isabelle_heaps") { override lazy val tables: SQL.Tables = SQL.Tables(Base.table, Slices.table) object Generic { val name = SQL.Column.string("name").make_primary_key } object Base { val name = Generic.name val heap_size = SQL.Column.long("heap_size") val heap_digest = SQL.Column.string("heap_digest") val uuid = SQL.Column.string("uuid") val log_db = SQL.Column.bytes("log_db") val table = make_table(List(name, heap_size, heap_digest, uuid, log_db)) } object Size { val name = Generic.name val heap = SQL.Column.string("heap") val log_db = SQL.Column.string("log_db") val table = make_table(List(name, heap, log_db), body = "SELECT name, pg_size_pretty(heap_size::bigint) as heap, " + " pg_size_pretty(length(log_db)::bigint) as log_db FROM " + Base.table.ident, name = "size") } object Slices { val name = Generic.name val slice = SQL.Column.int("slice").make_primary_key val content = SQL.Column.bytes("content") val table = make_table(List(name, slice, content), name = "slices") } object Slices_Size { val name = Generic.name val slice = Slices.slice val size = SQL.Column.string("size") val table = make_table(List(name, slice, size), body = "SELECT name, slice, pg_size_pretty(length(content)::bigint) as size FROM " + Slices.table.ident, name = "slices_size") } def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = if (names.isEmpty) Map.empty else { db.execute_query_statement( Base.table.select(List(Base.name, Base.heap_digest), sql = Generic.name.where_member(names)), List.from[(String, String)], res => res.string(Base.name) -> res.string(Base.heap_digest) ).collect({ case (name, digest) if digest.nonEmpty => name -> SHA1.fake_digest(digest) }).toMap } def read_slices(db: SQL.Database, name: String): List[Bytes] = db.execute_query_statement( Slices.table.select(List(Slices.content), sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))), List.from[Bytes], _.bytes(Slices.content)) def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] = db.execute_query_statement( Base.table.select(List(Base.uuid, Base.log_db), sql = SQL.where_and( Generic.name.equal(name), if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))), List.from[(String, Bytes)], res => (res.string(Base.uuid), res.bytes(Base.log_db)) ).collectFirst( { case (uuid, content) if uuid.nonEmpty && !content.is_empty => Log_DB(uuid, content) }) def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit = db.execute_statement(Slices.table.insert(), body = { stmt => stmt.string(1) = name stmt.int(2) = slice stmt.bytes(3) = content }) def clean_entry(db: SQL.Database, name: String): Unit = { for (table <- List(Base.table, Slices.table)) { db.execute_statement(table.delete(sql = Base.name.where_equal(name))) } } def init_entry( db: SQL.Database, name: String, heap_size: Long, heap_digest: Option[SHA1.Digest], log_db: Option[Log_DB] ): Unit = { clean_entry(db, name) for (table <- List(Size.table, Slices_Size.table)) { db.create_view(table) } db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = name stmt.long(2) = heap_size stmt.string(3) = heap_digest.map(_.toString) stmt.string(4) = log_db.map(_.uuid) stmt.bytes(5) = log_db.map(_.content) }) } def update_entry( db: SQL.Database, name: String, heap_size: Long, heap_digest: Option[SHA1.Digest], log_db: Option[Log_DB] ): Unit = db.execute_statement( Base.table.update(List(Base.heap_size, Base.heap_digest, Base.uuid, Base.log_db), sql = Base.name.where_equal(name)), body = { stmt => stmt.long(1) = heap_size stmt.string(2) = heap_digest.map(_.toString) stmt.string(3) = log_db.map(_.uuid) stmt.bytes(4) = log_db.map(_.content) }) } def clean_entry(db: SQL.Database, session_name: String): Unit = private_data.transaction_lock(db, create = true, label = "ML_Heap.clean_entry") { private_data.clean_entry(db, session_name) } def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = if (names.isEmpty) Map.empty else { private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") { private_data.read_digests(db, names) } } def store( db: SQL.Database, session: Store.Session, slice: Space, cache: Compress.Cache = Compress.Cache.none, progress: Progress = new Progress ): Unit = { val log_db = for { path <- session.log_db uuid <- proper_string(Store.read_build_uuid(path, session.name)) } yield Log_DB(uuid, Bytes.read(path)) val heap_digest = session.heap.map(write_file_digest) val heap_size = session.heap match { - case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length + case Some(heap) => File.size(heap) - sha1_length case None => 0L } val slice_size = slice.bytes max Space.MiB(1).bytes val slices = (heap_size.toDouble / slice_size.toDouble).ceil.toInt val step = if (slices == 0) 0L else (heap_size.toDouble / slices.toDouble).ceil.toLong def slice_content(i: Int): Bytes = { val j = i + 1 val offset = step * i val limit = if (j < slices) step * j else heap_size Bytes.read_file(session.the_heap, offset = offset, limit = limit) .compress(cache = cache) } try { if (slices > 0) progress.echo("Storing " + session.name + " ...") // init entry: slice 0 + initial log_db { val (heap_size0, heap_digest0) = if (slices > 1) (0L, None) else (heap_size, heap_digest) val log_db0 = if (slices <= 1) log_db else None val content0 = if (slices > 0) Some(slice_content(0)) else None if (log_db0.isDefined) progress.echo("Storing " + session.log_db_name + " ...") private_data.transaction_lock(db, create = true, label = "ML_Heap.store") { private_data.init_entry(db, session.name, heap_size0, heap_digest0, log_db0) for (content <- content0) private_data.write_slice(db, session.name, 0, content) } } // update entry: slice 1 ... + final log_db if (slices > 1) { for (i <- 1 until slices) { val content = slice_content(i) private_data.transaction_lock(db, label = "ML_Heap.store" + i) { private_data.write_slice(db, session.name, i, content) } } if (log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") private_data.transaction_lock(db, label = "ML_Heap.store_update") { private_data.update_entry(db, session.name, heap_size, heap_digest, log_db) } } } catch { case exn: Throwable => private_data.transaction_lock(db, create = true, label = "ML_Heap.store_clean") { private_data.clean_entry(db, session.name) } throw exn } } def restore( db: SQL.Database, sessions: List[Store.Session], cache: Compress.Cache = Compress.Cache.none, progress: Progress = new Progress ): Unit = { if (sessions.exists(_.defined)) { private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") { /* heap */ val defined_heaps = for (session <- sessions; heap <- session.heap) yield session.name -> heap val db_digests = private_data.read_digests(db, defined_heaps.map(_._1)) for ((session_name, heap) <- defined_heaps) { val file_digest = read_file_digest(heap) val db_digest = db_digests.get(session_name) if (db_digest.isDefined && db_digest != file_digest) { progress.echo("Restoring " + session_name + " ...") val base_dir = Isabelle_System.make_directory(heap.expand.dir) Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp => - Bytes.write(tmp, Bytes.empty) + tmp.file.delete() for (slice <- private_data.read_slices(db, session_name)) { Bytes.append(tmp, slice.uncompress(cache = cache)) } val digest = write_file_digest(tmp) - if (db_digest.get == digest) { - Isabelle_System.chmod("a+r", tmp) - Isabelle_System.move_file(tmp, heap) - } - else error("Incoherent content for session heap " + heap) + if (db_digest.get == digest) Isabelle_System.move_file(tmp, heap) + else error("Incoherent content for session heap " + heap.expand) } } } /* log_db */ for (session <- sessions; path <- session.log_db) { - val file_uuid = Store.read_build_uuid(path, session.name) - private_data.read_log_db(db, session.name, old_uuid = file_uuid) match { - case Some(log_db) if file_uuid.isEmpty => + val old_uuid = Store.read_build_uuid(path, session.name) + for (log_db <- private_data.read_log_db(db, session.name, old_uuid = old_uuid)) { + if (old_uuid.isEmpty) { progress.echo("Restoring " + session.log_db_name + " ...") Isabelle_System.make_directory(path.expand.dir) Bytes.write(path, log_db.content) - case Some(_) => error("Incoherent content for session database " + path) - case None => + } + else error("Incoherent content for session database " + path.expand) } } } } } } diff --git a/src/Pure/ML/ml_process.scala b/src/Pure/ML/ml_process.scala --- a/src/Pure/ML/ml_process.scala +++ b/src/Pure/ML/ml_process.scala @@ -1,189 +1,190 @@ /* Title: Pure/ML/ml_process.scala Author: Makarius The raw ML process. */ package isabelle import java.util.{Map => JMap, HashMap} import java.io.{File => JFile} object ML_Process { - def bootstrap_shasum(): SHA1.Shasum = - SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) + def make_shasum(ancestors: List[SHA1.Shasum]): SHA1.Shasum = + if (ancestors.isEmpty) SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) + else SHA1.flat_shasum(ancestors) def session_heaps( store: Store, session_background: Sessions.Background, logic: String = "" ): List[Path] = { val logic_name = Isabelle_System.default_logic(logic) session_background.sessions_structure.selection(logic_name). build_requirements(List(logic_name)). map(name => store.get_session(name).the_heap) } def apply( options: Options, session_background: Sessions.Background, session_heaps: List[Path], use_prelude: List[String] = Nil, eval_main: String = "", args: List[String] = Nil, modes: List[String] = Nil, cwd: JFile = null, env: JMap[String, String] = Isabelle_System.settings(), redirect: Boolean = false, cleanup: () => Unit = () => () ): Bash.Process = { val ml_options = options.standard_ml() val eval_init = if (session_heaps.isEmpty) { List( """ fun chapter (_: string) = (); fun section (_: string) = (); fun subsection (_: string) = (); fun subsubsection (_: string) = (); fun paragraph (_: string) = (); fun subparagraph (_: string) = (); val ML_file = PolyML.use; """, if (Platform.is_windows) "fun exit 0 = OS.Process.exit OS.Process.success" + " | exit 1 = OS.Process.exit OS.Process.failure" + " | exit rc = OS.Process.exit (RunCall.unsafeCast (Word8.fromInt rc))" else "fun exit rc = Posix.Process.exit (Word8.fromInt rc)", "PolyML.Compiler.prompt1 := \"Poly/ML> \"", "PolyML.Compiler.prompt2 := \"Poly/ML# \"") } else { List( "(PolyML.SaveState.loadHierarchy " + ML_Syntax.print_list( ML_Syntax.print_string_bytes)(session_heaps.map(File.platform_path)) + "; PolyML.print_depth 0)") } val eval_modes = if (modes.isEmpty) Nil else List("Print_Mode.add_modes " + ML_Syntax.print_list(ML_Syntax.print_string_bytes)(modes)) // options val eval_options = if (session_heaps.isEmpty) Nil else List("Options.load_default ()") val isabelle_process_options = Isabelle_System.tmp_file("options") File.restrict(File.path(isabelle_process_options)) File.write(isabelle_process_options, YXML.string_of_body(ml_options.encode)) // session resources val eval_init_session = if (session_heaps.isEmpty) Nil else List("Resources.init_session_env ()") val init_session = Isabelle_System.tmp_file("init_session") File.restrict(File.path(init_session)) File.write(init_session, new Resources(session_background).init_session_yxml) // process val eval_process = proper_string(eval_main).getOrElse( if (session_heaps.isEmpty) { "PolyML.print_depth " + ML_Syntax.print_int(ml_options.int("ML_print_depth")) } else "Isabelle_Process.init ()") // ISABELLE_TMP val isabelle_tmp = Isabelle_System.tmp_dir("process") val ml_runtime_options = { val ml_options0 = Word.explode(Isabelle_System.getenv("ML_OPTIONS")) val ml_options1 = if (ml_options0.exists(_.containsSlice("gcthreads"))) ml_options0 else ml_options0 ::: List("--gcthreads", ml_options.threads().toString) val ml_options2 = if (!Platform.is_windows || ml_options0.exists(_.containsSlice("codepage"))) ml_options1 else ml_options1 ::: List("--codepage", "utf8") ml_options2 ::: List("--exportstats") } // bash val bash_args = ml_runtime_options ::: (eval_init ::: eval_modes ::: eval_options ::: eval_init_session).flatMap(List("--eval", _)) ::: use_prelude.flatMap(List("--use", _)) ::: List("--eval", eval_process) ::: args val bash_env = new HashMap(env) bash_env.put("ISABELLE_PROCESS_OPTIONS", File.standard_path(isabelle_process_options)) bash_env.put("ISABELLE_INIT_SESSION", File.standard_path(init_session)) bash_env.put("ISABELLE_TMP", File.standard_path(isabelle_tmp)) bash_env.put("POLYSTATSDIR", isabelle_tmp.getAbsolutePath) val process_policy = ml_options.string("process_policy") val process_prefix = if_proper(process_policy, process_policy + " ") Bash.process(process_prefix + "\"$POLYML_EXE\" -q " + Bash.strings(bash_args), cwd = cwd, env = bash_env, redirect = redirect, cleanup = { () => isabelle_process_options.delete init_session.delete Isabelle_System.rm_tree(isabelle_tmp) cleanup() }) } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("process", "raw ML process (batch mode)", Scala_Project.here, { args => var dirs: List[Path] = Nil var eval_args: List[String] = Nil var logic = Isabelle_System.getenv("ISABELLE_LOGIC") var modes: List[String] = Nil var options = Options.init() val getopts = Getopts(""" Usage: isabelle process [OPTIONS] Options are: -T THEORY load theory -d DIR include session directory -e ML_EXPR evaluate ML expression on startup -f ML_FILE evaluate ML file on startup -l NAME logic session name (default ISABELLE_LOGIC=""" + quote(logic) + """) -m MODE add print mode for output -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) Run the raw Isabelle ML process in batch mode. """, "T:" -> (arg => eval_args = eval_args ::: List("--eval", "use_thy " + ML_Syntax.print_string_bytes(arg))), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), "e:" -> (arg => eval_args = eval_args ::: List("--eval", arg)), "f:" -> (arg => eval_args = eval_args ::: List("--use", arg)), "l:" -> (arg => logic = arg), "m:" -> (arg => modes = arg :: modes), "o:" -> (arg => options = options + arg)) val more_args = getopts(args) if (args.isEmpty || more_args.nonEmpty) getopts.usage() val store = Store(options) val session_background = Sessions.background(options, logic, dirs = dirs).check_errors val session_heaps = ML_Process.session_heaps(store, session_background, logic = logic) val result = ML_Process(options, session_background, session_heaps, args = eval_args, modes = modes) .result( progress_stdout = Output.writeln(_, stdout = true), progress_stderr = Output.writeln(_)) sys.exit(result.rc) }) } diff --git a/src/Pure/Tools/update.scala b/src/Pure/Tools/update.scala --- a/src/Pure/Tools/update.scala +++ b/src/Pure/Tools/update.scala @@ -1,232 +1,232 @@ /* Title: Pure/Tools/update.scala Author: Makarius Update theory sources based on PIDE markup. */ package isabelle object Update { val update_elements: Markup.Elements = Markup.Elements(Markup.UPDATE, Markup.LANGUAGE) def update_xml(options: Options, xml: XML.Body): XML.Body = { val update_path_cartouches = options.bool("update_path_cartouches") val update_cite = options.bool("update_cite") val cite_commands = Bibtex.cite_commands(options) def upd(lang: Markup.Language, ts: XML.Body): XML.Body = ts flatMap { case XML.Wrapped_Elem(markup, body1, body2) => val body = if (markup.name == Markup.UPDATE) body1 else body2 upd(lang, body) case XML.Elem(Markup.Language(lang1), body) => if (update_path_cartouches && lang1.is_path) { Token.read_embedded(Keyword.Keywords.empty, XML.content(body)) match { case Some(tok) => List(XML.Text(Symbol.cartouche(tok.content))) case None => upd(lang1, body) } } else if (update_cite && lang1.is_antiquotation) { List(XML.Text(Bibtex.update_cite_antiquotation(cite_commands, XML.content(body)))) } else upd(lang1, body) case XML.Elem(_, body) => upd(lang, body) case XML.Text(s) if update_cite && lang.is_document => List(XML.Text(Bibtex.update_cite_commands(s))) case t => List(t) } upd(Markup.Language.outer, xml) } def default_base_logic: String = Isabelle_System.getenv("ISABELLE_LOGIC") def update(options: Options, update_options: List[Options.Spec], selection: Sessions.Selection = Sessions.Selection.empty, base_logics: List[String] = Nil, progress: Progress = new Progress, build_heap: Boolean = false, clean_build: Boolean = false, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, numa_shuffling: Boolean = false, max_jobs: Option[Int] = None, fresh_build: Boolean = false, no_build: Boolean = false ): Build.Results = { /* excluded sessions */ val exclude: Set[String] = if (base_logics.isEmpty) Set.empty else { Sessions.load_structure(options, dirs = dirs, select_dirs = select_dirs) .selection(Sessions.Selection(sessions = base_logics)) .build_graph.domain } // test options ++ update_options def augment_options(name: String): List[Options.Spec] = if (exclude(name)) Nil else update_options /* build */ val build_options = options + "build_thorough" val build_results = Build.build(build_options, progress = progress, dirs = dirs, select_dirs = select_dirs, selection = selection, build_heap = build_heap, clean_build = clean_build, numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build, no_build = no_build, augment_options = augment_options) val store = build_results.store val sessions_structure = build_results.deps.sessions_structure /* update */ var seen_theory = Set.empty[String] using(Export.open_database_context(store)) { database_context => for { session <- sessions_structure.build_topological_order if build_results(session).ok && !exclude(session) } { progress.echo("Updating " + session + " ...") val session_options = sessions_structure(session).options val proper_session_theory = build_results.deps(session).proper_session_theories.map(_.theory).toSet using(database_context.open_session0(session)) { session_context => for { db <- session_context.session_db() theory <- store.read_theories(db, session) if proper_session_theory(theory) && !seen_theory(theory) } { seen_theory += theory val theory_context = session_context.theory(theory) for { theory_snapshot <- Build.read_theory(theory_context) node_name <- theory_snapshot.node_files snapshot = theory_snapshot.switch(node_name) if snapshot.node.source_wellformed } { progress.expose_interrupt() val xml = YXML.parse_body(YXML.string_of_body(snapshot.xml_markup(elements = update_elements))) val source1 = XML.content(update_xml(session_options, xml)) if (source1 != snapshot.node.source) { val path = Path.explode(node_name.node) progress.echo("File " + quote(File.standard_path(path))) File.write(path, source1) } } } } } } build_results } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("update", "update theory sources based on PIDE markup", Scala_Project.here, { args => var base_sessions: List[String] = Nil var select_dirs: List[Path] = Nil var numa_shuffling = false var requirements = false var exclude_session_groups: List[String] = Nil var all_sessions = false var build_heap = false var clean_build = false var dirs: List[Path] = Nil var fresh_build = false var session_groups: List[String] = Nil var max_jobs: Option[Int] = None var base_logics: List[String] = List(default_base_logic) var no_build = false var options = Options.init() var update_options: List[Options.Spec] = Nil var verbose = false var exclude_sessions: List[String] = Nil val getopts = Getopts(""" Usage: isabelle update [OPTIONS] [SESSIONS ...] Options are: -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -R refer to requirements of selected sessions -X NAME exclude sessions from group NAME and all descendants -a select all sessions -b build heap images -c clean build -d DIR include session directory -f fresh build -g NAME select session group NAME -j INT maximum number of parallel jobs (default 1) -l NAMES comma-separated list of base logics, to remain unchanged (default: """ + quote(default_base_logic) + """) -n no build -- take existing session build databases -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -u OPT override "update" option for selected sessions -v verbose -x NAME exclude session NAME and all descendants Update theory sources based on PIDE markup produced by "isabelle build". """, "B:" -> (arg => base_sessions = base_sessions ::: List(arg)), "D:" -> (arg => select_dirs = select_dirs ::: List(Path.explode(arg))), "N" -> (_ => numa_shuffling = true), "R" -> (_ => requirements = true), "X:" -> (arg => exclude_session_groups = exclude_session_groups ::: List(arg)), "a" -> (_ => all_sessions = true), "b" -> (_ => build_heap = true), "c" -> (_ => clean_build = true), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), "f" -> (_ => fresh_build = true), "g:" -> (arg => session_groups = session_groups ::: List(arg)), "j:" -> (arg => max_jobs = Some(Value.Nat.parse(arg))), "l:" -> (arg => base_logics = space_explode(',', arg)), "n" -> (_ => no_build = true), "o:" -> (arg => options = options + arg), "u:" -> (arg => update_options = update_options ::: List(Options.Spec("update_" + arg))), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions = exclude_sessions ::: List(arg))) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) val results = progress.interrupt_handler { update(options, update_options, selection = Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions, exclude_session_groups = exclude_session_groups, exclude_sessions = exclude_sessions, session_groups = session_groups, sessions = sessions), base_logics = base_logics, progress = progress, - build_heap, - clean_build, + build_heap = build_heap, + clean_build = clean_build, dirs = dirs, select_dirs = select_dirs, numa_shuffling = Host.numa_check(progress, numa_shuffling), max_jobs = max_jobs, - fresh_build, + fresh_build = fresh_build, no_build = no_build) } sys.exit(results.rc) }) }