diff --git a/src/Pure/Admin/build_log.scala b/src/Pure/Admin/build_log.scala --- a/src/Pure/Admin/build_log.scala +++ b/src/Pure/Admin/build_log.scala @@ -1,1399 +1,1399 @@ /* Title: Pure/Admin/build_log.scala Author: Makarius Management of build log files and database storage. */ package isabelle import java.io.{File => JFile} import java.time.format.{DateTimeFormatter, DateTimeParseException} import java.util.Locale import scala.collection.immutable.SortedMap import scala.collection.mutable import scala.util.matching.Regex object Build_Log { /** content **/ /* properties */ object Prop { val build_tags = SQL.Column.string("build_tags") // lines val build_args = SQL.Column.string("build_args") // lines val build_group_id = SQL.Column.string("build_group_id") val build_id = SQL.Column.string("build_id") val build_engine = SQL.Column.string("build_engine") val build_host = SQL.Column.string("build_host") val build_start = SQL.Column.date("build_start") val build_end = SQL.Column.date("build_end") val isabelle_version = SQL.Column.string("isabelle_version") val afp_version = SQL.Column.string("afp_version") val all_props: List[SQL.Column] = List(build_tags, build_args, build_group_id, build_id, build_engine, build_host, build_start, build_end, isabelle_version, afp_version) } /* settings */ object Settings { val ISABELLE_BUILD_OPTIONS = SQL.Column.string("ISABELLE_BUILD_OPTIONS") val ML_PLATFORM = SQL.Column.string("ML_PLATFORM") val ML_HOME = SQL.Column.string("ML_HOME") val ML_SYSTEM = SQL.Column.string("ML_SYSTEM") val ML_OPTIONS = SQL.Column.string("ML_OPTIONS") val ml_settings = List(ML_PLATFORM, ML_HOME, ML_SYSTEM, ML_OPTIONS) val all_settings = ISABELLE_BUILD_OPTIONS :: ml_settings type Entry = (String, String) type T = List[Entry] object Entry { def unapply(s: String): Option[Entry] = for { (a, b) <- Properties.Eq.unapply(s) } yield (a, Library.perhaps_unquote(b)) def getenv(a: String): String = Properties.Eq(a, quote(Isabelle_System.getenv(a))) } def show(): String = cat_lines( List(Entry.getenv("ISABELLE_TOOL_JAVA_OPTIONS"), Entry.getenv(ISABELLE_BUILD_OPTIONS.name), "") ::: ml_settings.map(c => Entry.getenv(c.name))) } /* file names */ def log_date(date: Date): String = String.format(Locale.ROOT, "%s.%05d", DateTimeFormatter.ofPattern("yyyy-MM-dd").format(date.rep), java.lang.Long.valueOf((date - date.midnight).ms / 1000)) def log_subdir(date: Date): Path = Path.explode("log") + Path.explode(date.rep.getYear.toString) def log_filename(engine: String, date: Date, more: List[String] = Nil): Path = Path.explode((engine :: log_date(date) :: more).mkString("", "_", ".log")) /** log file **/ def print_date(date: Date): String = Log_File.Date_Format(date) object Log_File { /* log file */ val all_suffixes: List[String] = List(".log", ".log.gz", ".log.xz", ".gz", ".xz") def plain_name(name: String): String = { all_suffixes.find(name.endsWith) match { case Some(s) => Library.try_unsuffix(s, name).get case None => name } } def plain_name(file: JFile): String = plain_name(file.getName) def apply(name: String, lines: List[String], cache: XML.Cache = XML.Cache.none): Log_File = new Log_File(plain_name(name), lines.map(s => cache.string(Library.trim_line(s))), cache) def read(file: JFile, cache: XML.Cache = XML.Cache.none): Log_File = { val name = file.getName val text = if (File.is_gz(name)) File.read_gzip(file) else if (File.is_xz(name)) Bytes.read(file).uncompress_xz(cache = cache.compress).text else File.read(file) apply(name, Library.trim_split_lines(text), cache = cache) } /* log file collections */ val log_prefixes: List[String] = List(Build_History.log_prefix, Identify.log_prefix, Identify.log_prefix2, Isatest.log_prefix, AFP_Test.log_prefix) val log_suffixes: List[String] = List(".log", ".log.gz", ".log.xz") def is_log(file: JFile, prefixes: List[String] = log_prefixes, suffixes: List[String] = log_suffixes ): Boolean = { val name = file.getName prefixes.exists(name.startsWith) && suffixes.exists(name.endsWith) && name != "isatest.log" && name != "afp-test.log" && name != "main.log" } def find_files(starts: List[JFile]): List[JFile] = starts.flatMap(start => File.find_files(start, pred = is_log(_), follow_links = true)) .sortBy(plain_name) /* date format */ val Date_Format = { val fmts = Date.Formatter.variants( List("EEE MMM d HH:mm:ss O yyyy", "EEE MMM d HH:mm:ss VV yyyy"), List(Locale.ENGLISH, Locale.GERMAN)) ::: List( DateTimeFormatter.RFC_1123_DATE_TIME, Date.Formatter.pattern("EEE MMM d HH:mm:ss yyyy").withZone(Date.timezone_berlin)) def tune_timezone(s: String): String = s match { case "CET" | "MET" => "GMT+1" case "CEST" | "MEST" => "GMT+2" case "EST" => "Europe/Berlin" case _ => s } def tune_weekday(s: String): String = s match { case "Die" => "Di" case "Mit" => "Mi" case "Don" => "Do" case "Fre" => "Fr" case "Sam" => "Sa" case "Son" => "So" case _ => s } def tune(s: String): String = Word.implode( Word.explode(s) match { case a :: "M\uFFFDr" :: bs => tune_weekday(a) :: "Mär" :: bs.map(tune_timezone) case a :: bs => tune_weekday(a) :: bs.map(tune_timezone) case Nil => Nil } ) Date.Format.make(fmts, tune) } } class Log_File private( val name: String, val lines: List[String], val cache: XML.Cache ) { log_file => override def toString: String = name def text: String = cat_lines(lines) def err(msg: String): Nothing = error("Bad log file " + quote(name) + ": " + msg) /* date format */ object Strict_Date { def unapply(s: String): Some[Date] = try { Some(Log_File.Date_Format.parse(s)) } catch { case exn: DateTimeParseException => log_file.err(exn.getMessage) } } /* inlined text */ def filter(Marker: Protocol_Message.Marker): List[String] = for (case Marker(text) <- lines) yield text def find(Marker: Protocol_Message.Marker): Option[String] = lines.collectFirst({ case Marker(text) => text }) def find_match(regexes: List[Regex]): Option[String] = regexes match { case Nil => None case regex :: rest => lines.iterator.map(regex.unapplySeq(_)).find(res => res.isDefined && res.get.length == 1). map(res => res.get.head) orElse find_match(rest) } /* settings */ def get_setting(name: String): Option[Settings.Entry] = lines.collectFirst({ case Settings.Entry(a, b) if a == name => a -> b }) def get_all_settings: Settings.T = for { c <- Settings.all_settings; entry <- get_setting(c.name) } yield entry /* properties (YXML) */ def parse_props(text: String): Properties.T = try { cache.props(XML.Decode.properties(YXML.parse_body(text, cache = cache))) } catch { case _: XML.Error => log_file.err("malformed properties") } def filter_props(marker: Protocol_Message.Marker): List[Properties.T] = for (text <- filter(marker) if YXML.detect(text)) yield parse_props(text) def find_props(marker: Protocol_Message.Marker): Option[Properties.T] = for (text <- find(marker) if YXML.detect(text)) yield parse_props(text) /* parse various formats */ def parse_meta_info(): Meta_Info = Build_Log.parse_meta_info(log_file) def parse_build_info(ml_statistics: Boolean = false): Build_Info = Build_Log.parse_build_info(log_file, ml_statistics) def parse_session_info( command_timings: Boolean = false, theory_timings: Boolean = false, ml_statistics: Boolean = false, task_statistics: Boolean = false): Session_Info = Build_Log.parse_session_info( log_file, command_timings, theory_timings, ml_statistics, task_statistics) } /** digested meta info: produced by Admin/build_other in log.xz file **/ object Meta_Info { val empty: Meta_Info = Meta_Info(Nil, Nil) } sealed case class Meta_Info(props: Properties.T, settings: Settings.T) { def is_empty: Boolean = props.isEmpty && settings.isEmpty def get(c: SQL.Column): Option[String] = Properties.get(props, c.name) orElse Properties.get(settings, c.name) def get_date(c: SQL.Column): Option[Date] = get(c).map(Log_File.Date_Format.parse) def get_build_host: Option[String] = get(Prop.build_host) def get_build_start: Option[Date] = get_date(Prop.build_start) def get_build_end: Option[Date] = get_date(Prop.build_end) } object Identify { val log_prefix = "isabelle_identify_" val log_prefix2 = "plain_identify_" def engine(log_file: Log_File): String = if (log_file.name.startsWith(log_prefix2)) "plain_identify" else "identify" def content(date: Date, isabelle_version: Option[String], afp_version: Option[String]): String = terminate_lines( List("isabelle_identify: " + Build_Log.print_date(date), "") ::: isabelle_version.map("Isabelle version: " + _).toList ::: afp_version.map("AFP version: " + _).toList) val Start = new Regex("""^isabelle_identify: (.+)$""") val No_End = new Regex("""$.""") val Isabelle_Version = List(new Regex("""^Isabelle version: (\S+)$""")) val AFP_Version = List(new Regex("""^AFP version: (\S+)$""")) } object Isatest { val log_prefix = "isatest-makeall-" val engine = "isatest" val Start = new Regex("""^------------------- starting test --- (.+) --- (.+)$""") val End = new Regex("""^------------------- test (?:successful|FAILED) --- (.+) --- .*$""") val Isabelle_Version = List(new Regex("""^Isabelle version: (\S+)$""")) } object AFP_Test { val log_prefix = "afp-test-devel-" val engine = "afp-test" val Start = new Regex("""^Start test(?: for .+)? at ([^,]+), (.*)$""") val Start_Old = new Regex("""^Start test(?: for .+)? at ([^,]+)$""") val End = new Regex("""^End test on (.+), .+, elapsed time:.*$""") val Isabelle_Version = List(new Regex("""^Isabelle version: .* -- hg id (\S+)$""")) val AFP_Version = List(new Regex("""^AFP version: .* -- hg id (\S+)$""")) val Bad_Init = new Regex("""^cp:.*: Disc quota exceeded$""") } object Jenkins { val log_prefix = "jenkins_" val engine = "jenkins" val Host = new Regex("""^Building remotely on (\S+) \((\S+)\).*$""") val Start = new Regex("""^(?:Started by an SCM change|Started from command line by admin|).*$""") val Start_Date = new Regex("""^Build started at (.+)$""") val No_End = new Regex("""$.""") val Isabelle_Version = List(new Regex("""^(?:Build for Isabelle id|Isabelle id) (\w+).*$"""), new Regex("""^ISABELLE_CI_REPO_ID="(\w+)".*$"""), new Regex("""^(\w{12}) tip.*$""")) val AFP_Version = List(new Regex("""^(?:Build for AFP id|AFP id) (\w+).*$"""), new Regex("""^ISABELLE_CI_AFP_ID="(\w+)".*$""")) val CONFIGURATION = "=== CONFIGURATION ===" val BUILD = "=== BUILD ===" } private def parse_meta_info(log_file: Log_File): Meta_Info = { def parse(engine: String, host: String, start: Date, End: Regex, Isabelle_Version: List[Regex], AFP_Version: List[Regex] ): Meta_Info = { val build_id = { val prefix = proper_string(host) orElse proper_string(engine) getOrElse "build" prefix + ":" + start.time.ms } val build_engine = if (engine == "") Nil else List(Prop.build_engine.name -> engine) val build_host = if (host == "") Nil else List(Prop.build_host.name -> host) val start_date = List(Prop.build_start.name -> print_date(start)) val end_date = log_file.lines.last match { case End(log_file.Strict_Date(end_date)) => List(Prop.build_end.name -> print_date(end_date)) case _ => Nil } val isabelle_version = log_file.find_match(Isabelle_Version).map(Prop.isabelle_version.name -> _) val afp_version = log_file.find_match(AFP_Version).map(Prop.afp_version.name -> _) Meta_Info((Prop.build_id.name -> build_id) :: build_engine ::: build_host ::: start_date ::: end_date ::: isabelle_version.toList ::: afp_version.toList, log_file.get_all_settings) } log_file.lines match { case line :: _ if Protocol.Meta_Info_Marker.test_yxml(line) => Meta_Info(log_file.find_props(Protocol.Meta_Info_Marker).get, log_file.get_all_settings) case Identify.Start(log_file.Strict_Date(start)) :: _ => parse(Identify.engine(log_file), "", start, Identify.No_End, Identify.Isabelle_Version, Identify.AFP_Version) case Isatest.Start(log_file.Strict_Date(start), host) :: _ => parse(Isatest.engine, host, start, Isatest.End, Isatest.Isabelle_Version, Nil) case AFP_Test.Start(log_file.Strict_Date(start), host) :: _ => parse(AFP_Test.engine, host, start, AFP_Test.End, AFP_Test.Isabelle_Version, AFP_Test.AFP_Version) case AFP_Test.Start_Old(log_file.Strict_Date(start)) :: _ => parse(AFP_Test.engine, "", start, AFP_Test.End, AFP_Test.Isabelle_Version, AFP_Test.AFP_Version) case line :: _ if line.startsWith("\u0000") => Meta_Info.empty case List(Isatest.End(_)) => Meta_Info.empty case _ :: AFP_Test.Bad_Init() :: _ => Meta_Info.empty case Nil => Meta_Info.empty case _ => log_file.err("cannot detect log file format") } } /** build info: toplevel output of isabelle build or Admin/build_other **/ val SESSION_NAME = "session_name" enum Session_Status { case existing, finished, failed, cancelled } sealed case class Session_Entry( chapter: String = "", groups: List[String] = Nil, hostname: Option[String] = None, threads: Option[Int] = None, start: Option[Time] = None, timing: Timing = Timing.zero, ml_timing: Timing = Timing.zero, sources: Option[String] = None, heap_size: Option[Space] = None, status: Option[Session_Status] = None, errors: List[String] = Nil, theory_timings: Map[String, Timing] = Map.empty, ml_statistics: List[Properties.T] = Nil ) { def proper_groups: Option[String] = if (groups.isEmpty) None else Some(cat_lines(groups)) def finished: Boolean = status == Some(Session_Status.finished) def failed: Boolean = status == Some(Session_Status.failed) } object Build_Info { val sessions_dummy: Map[String, Session_Entry] = Map("" -> Session_Entry(theory_timings = Map("" -> Timing.zero))) } sealed case class Build_Info(sessions: Map[String, Session_Entry]) { def finished_sessions: List[String] = for ((a, b) <- sessions.toList if b.finished) yield a def failed_sessions: List[String] = for ((a, b) <- sessions.toList if b.failed) yield a } private def parse_build_info(log_file: Log_File, parse_ml_statistics: Boolean): Build_Info = { object Chapter_Name { def unapply(s: String): Some[(String, String)] = space_explode('/', s) match { case List(chapter, name) => Some((chapter, name)) case _ => Some(("", s)) } } val Session_No_Groups = new Regex("""^Session (\S+)$""") val Session_Groups = new Regex("""^Session (\S+) \((.*)\)$""") val Session_Finished1 = new Regex("""^Finished (\S+) \((\d+):(\d+):(\d+) elapsed time, (\d+):(\d+):(\d+) cpu time.*$""") val Session_Finished2 = new Regex("""^Finished ([^\s/]+) \((\d+):(\d+):(\d+) elapsed time.*$""") val Session_Timing = new Regex("""^Timing (\S+) \((\d+) threads, (\d+\.\d+)s elapsed time, (\d+\.\d+)s cpu time, (\d+\.\d+)s GC time.*$""") val Session_Started1 = new Regex("""^(?:Running|Building) (\S+) \.\.\.$""") val Session_Started2 = new Regex("""^(?:Running|Building) (\S+) \(?on ([^\s/]+)[^)]*\)? \.\.\.$""") val Session_Started3 = new Regex("""^(?:Running|Building) (\S+) \(started (\d+):(\d+):(\d+)\) \.\.\.$""") val Session_Started4 = new Regex("""^(?:Running|Building) (\S+) \(started (\d+):(\d+):(\d+) on ([^\s/]+)[^)]*\) \.\.\.$""") val Sources = new Regex("""^Sources (\S+) (\S{""" + SHA1.digest_length + """})$""") val Heap = new Regex("""^Heap (\S+) \((\d+) bytes\)$""") object Theory_Timing { def unapply(line: String): Option[(String, (String, Timing))] = Protocol.Theory_Timing_Marker.unapply(line.replace('~', '-')).map(log_file.parse_props) match { case Some((SESSION_NAME, session) :: props) => for (theory <- Markup.Name.unapply(props)) yield (session, theory -> Markup.Timing_Properties.get(props)) case _ => None } } var chapter = Map.empty[String, String] var groups = Map.empty[String, List[String]] var hostnames = Map.empty[String, String] var threads = Map.empty[String, Int] var timing = Map.empty[String, Timing] var ml_timing = Map.empty[String, Timing] var started = Map.empty[String, Option[Time]] var sources = Map.empty[String, String] var heap_sizes = Map.empty[String, Space] var theory_timings = Map.empty[String, Map[String, Timing]] var ml_statistics = Map.empty[String, List[Properties.T]] var errors = Map.empty[String, List[String]] def all_sessions: Set[String] = chapter.keySet ++ groups.keySet ++ threads.keySet ++ timing.keySet ++ ml_timing.keySet ++ started.keySet ++ sources.keySet ++ heap_sizes.keySet ++ theory_timings.keySet ++ ml_statistics.keySet for (line <- log_file.lines) { line match { case Session_No_Groups(Chapter_Name(chapt, name)) => chapter += (name -> chapt) groups += (name -> Nil) case Session_Groups(Chapter_Name(chapt, name), grps) => chapter += (name -> chapt) groups += (name -> Word.explode(grps)) case Session_Started1(name) => started += (name -> None) case Session_Started2(name, hostname) => started += (name -> None) hostnames += (name -> hostname) case Session_Started3(name, Value.Int(t1), Value.Int(t2), Value.Int(t3)) => started += (name -> Some(Time.hms(t1, t2, t3))) case Session_Started4(name, Value.Int(t1), Value.Int(t2), Value.Int(t3), hostname) => started += (name -> Some(Time.hms(t1, t2, t3))) hostnames += (name -> hostname) case Session_Finished1(name, Value.Int(e1), Value.Int(e2), Value.Int(e3), Value.Int(c1), Value.Int(c2), Value.Int(c3)) => val elapsed = Time.hms(e1, e2, e3) val cpu = Time.hms(c1, c2, c3) timing += (name -> Timing(elapsed, cpu, Time.zero)) case Session_Finished2(name, Value.Int(e1), Value.Int(e2), Value.Int(e3)) => val elapsed = Time.hms(e1, e2, e3) timing += (name -> Timing(elapsed, Time.zero, Time.zero)) case Session_Timing(name, Value.Int(t), Value.Double(e), Value.Double(c), Value.Double(g)) => val elapsed = Time.seconds(e) val cpu = Time.seconds(c) val gc = Time.seconds(g) ml_timing += (name -> Timing(elapsed, cpu, gc)) threads += (name -> t) case Sources(name, s) => sources += (name -> s) case Heap(name, Value.Long(size)) => heap_sizes += (name -> Space.bytes(size)) case _ if Protocol.Theory_Timing_Marker.test_yxml(line) => line match { case Theory_Timing(name, theory_timing) => theory_timings += (name -> (theory_timings.getOrElse(name, Map.empty) + theory_timing)) case _ => log_file.err("malformed theory_timing " + quote(line)) } case _ if parse_ml_statistics && Protocol.ML_Statistics_Marker.test_yxml(line) => Protocol.ML_Statistics_Marker.unapply(line).map(log_file.parse_props) match { case Some((SESSION_NAME, name) :: props) => ml_statistics += (name -> (props :: ml_statistics.getOrElse(name, Nil))) case _ => log_file.err("malformed ML_statistics " + quote(line)) } case _ if Protocol.Error_Message_Marker.test_yxml(line) => Protocol.Error_Message_Marker.unapply(line).map(log_file.parse_props) match { case Some(List((SESSION_NAME, name), (Markup.CONTENT, msg))) => errors += (name -> (msg :: errors.getOrElse(name, Nil))) case _ => log_file.err("malformed error message " + quote(line)) } case _ => } } val sessions = Map( (for (name <- all_sessions.toList) yield { val status = if (timing.isDefinedAt(name) || ml_timing.isDefinedAt(name)) Session_Status.finished else if (started.isDefinedAt(name)) Session_Status.failed else Session_Status.existing val entry = Session_Entry( chapter = chapter.getOrElse(name, ""), groups = groups.getOrElse(name, Nil), hostname = hostnames.get(name), threads = threads.get(name), start = started.get(name).flatten, timing = timing.getOrElse(name, Timing.zero), ml_timing = ml_timing.getOrElse(name, Timing.zero), sources = sources.get(name), heap_size = heap_sizes.get(name), status = Some(status), errors = errors.getOrElse(name, Nil).reverse, theory_timings = theory_timings.getOrElse(name, Map.empty), ml_statistics = ml_statistics.getOrElse(name, Nil).reverse) (name -> entry) }):_*) Build_Info(sessions) } /** session info: produced by isabelle build as session database **/ sealed case class Session_Info( session_timing: Properties.T, command_timings: List[Properties.T], theory_timings: List[Properties.T], ml_statistics: List[Properties.T], task_statistics: List[Properties.T], errors: List[String] ) { def error(s: String): Session_Info = copy(errors = errors ::: List(s)) } private def parse_session_info( log_file: Log_File, command_timings: Boolean, theory_timings: Boolean, ml_statistics: Boolean, task_statistics: Boolean ): Session_Info = { Session_Info( session_timing = log_file.find_props(Protocol.Session_Timing_Marker) getOrElse Nil, command_timings = if (command_timings) log_file.filter_props(Protocol.Command_Timing_Marker) else Nil, theory_timings = if (theory_timings) log_file.filter_props(Protocol.Theory_Timing_Marker) else Nil, ml_statistics = if (ml_statistics) log_file.filter_props(Protocol.ML_Statistics_Marker) else Nil, task_statistics = if (task_statistics) log_file.filter_props(Protocol.Task_Statistics_Marker) else Nil, errors = log_file.filter(Protocol.Error_Message_Marker)) } def compress_errors( errors: List[String], cache: Compress.Cache = Compress.Cache.none ): Option[Bytes] = if (errors.isEmpty) None else { Some(Bytes(YXML.string_of_body(XML.Encode.list(XML.Encode.string)(errors))). compress(cache = cache)) } def uncompress_errors(bytes: Bytes, cache: XML.Cache = XML.Cache.make()): List[String] = if (bytes.is_empty) Nil else { XML.Decode.list(YXML.string_of_body)( YXML.parse_body(bytes.uncompress(cache = cache.compress).text, cache = cache)) } /** persistent store **/ /* SQL data model */ object Column { val log_name = SQL.Column.string("log_name").make_primary_key val session_name = SQL.Column.string("session_name").make_primary_key val theory_name = SQL.Column.string("theory_name").make_primary_key val chapter = SQL.Column.string("chapter") val groups = SQL.Column.string("groups") val hostname = SQL.Column.string("hostname") val threads = SQL.Column.int("threads") val session_start = SQL.Column.long("session_start") val timing_elapsed = SQL.Column.long("timing_elapsed") val timing_cpu = SQL.Column.long("timing_cpu") val timing_gc = SQL.Column.long("timing_gc") val timing_factor = SQL.Column.double("timing_factor") val ml_timing_elapsed = SQL.Column.long("ml_timing_elapsed") val ml_timing_cpu = SQL.Column.long("ml_timing_cpu") val ml_timing_gc = SQL.Column.long("ml_timing_gc") val ml_timing_factor = SQL.Column.double("ml_timing_factor") val theory_timing_elapsed = SQL.Column.long("theory_timing_elapsed") val theory_timing_cpu = SQL.Column.long("theory_timing_cpu") val theory_timing_gc = SQL.Column.long("theory_timing_gc") val heap_size = SQL.Column.long("heap_size") val status = SQL.Column.string("status") val errors = SQL.Column.bytes("errors") val sources = SQL.Column.string("sources") val ml_statistics = SQL.Column.bytes("ml_statistics") val known = SQL.Column.bool("known") def pull_date(afp: Boolean = false): SQL.Column = if (afp) SQL.Column.date("afp_pull_date") else SQL.Column.date("pull_date") } object private_data extends SQL.Data("isabelle_build_log") { /* tables */ + override lazy val tables: SQL.Tables = + SQL.Tables( + meta_info_table, + sessions_table, + theories_table, + ml_statistics_table) + val meta_info_table = make_table(Column.log_name :: Prop.all_props ::: Settings.all_settings, name = "meta_info") val sessions_table = make_table( List(Column.log_name, Column.session_name, Column.chapter, Column.groups, Column.hostname, Column.threads, Column.timing_elapsed, Column.timing_cpu, Column.timing_gc, Column.timing_factor, Column.ml_timing_elapsed, Column.ml_timing_cpu, Column.ml_timing_gc, Column.ml_timing_factor, Column.heap_size, Column.status, Column.errors, Column.sources, Column.session_start), name = "sessions") val theories_table = make_table( List(Column.log_name, Column.session_name, Column.theory_name, Column.theory_timing_elapsed, Column.theory_timing_cpu, Column.theory_timing_gc), name = "theories") val ml_statistics_table = make_table(List(Column.log_name, Column.session_name, Column.ml_statistics), name = "ml_statistics") - override val tables: SQL.Tables = - SQL.Tables( - meta_info_table, - sessions_table, - theories_table, - ml_statistics_table) - /* earliest pull date for repository version (PostgreSQL queries) */ def pull_date_table(afp: Boolean = false): SQL.Table = { val (name, versions) = if (afp) ("afp_pull_date", List(Prop.isabelle_version, Prop.afp_version)) else ("pull_date", List(Prop.isabelle_version)) make_table(versions.map(_.make_primary_key) ::: List(Column.pull_date(afp)), body = "SELECT " + versions.mkString(", ") + ", min(" + Prop.build_start + ") AS " + Column.pull_date(afp) + " FROM " + meta_info_table + " WHERE " + SQL.AND((versions ::: List(Prop.build_start)).map(_.defined)) + " GROUP BY " + versions.mkString(", "), name = name) } /* recent entries */ def recent(c: SQL.Column, days: Int, default: PostgreSQL.Source = ""): PostgreSQL.Source = if (days <= 0) default else c.ident + " > now() - INTERVAL '" + days + " days'" def recent_pull_date_table( days: Int = 0, rev: String = "", afp_rev: Option[String] = None ): SQL.Table = { val afp = afp_rev.isDefined val rev2 = afp_rev.getOrElse("") val table = pull_date_table(afp) val eq_rev = if_proper(rev, Prop.isabelle_version(table).equal(rev)) val eq_rev2 = if_proper(rev2, Prop.afp_version(table).equal(rev2)) SQL.Table("recent_pull_date", table.columns, table.select(table.columns, sql = SQL.where_or( recent(Column.pull_date(afp)(table), days, default = SQL.TRUE), SQL.and(eq_rev, eq_rev2)))) } def select_recent_log_names(days: Int = 0): PostgreSQL.Source = { val table1 = meta_info_table val table2 = recent_pull_date_table(days = days) table1.select(List(Column.log_name), distinct = true, sql = SQL.join_inner + table2.query_named + " ON " + Prop.isabelle_version(table1) + " = " + Prop.isabelle_version(table2)) } /* universal view on main data */ val universal_table: SQL.Table = { val afp_pull_date = Column.pull_date(afp = true) val version1 = Prop.isabelle_version val version2 = Prop.afp_version val table1 = meta_info_table val table2 = pull_date_table(afp = true) val table3 = pull_date_table() val a_columns = Column.log_name :: afp_pull_date :: table1.columns.tail val a_table = SQL.Table("a", a_columns, SQL.select(List(Column.log_name, afp_pull_date) ::: table1.columns.tail.map(_.apply(table1))) + table1 + SQL.join_outer + table2 + " ON " + SQL.and( version1(table1).ident + " = " + version1(table2).ident, version2(table1).ident + " = " + version2(table2).ident)) val b_columns = Column.log_name :: Column.pull_date() :: a_columns.tail val b_table = SQL.Table("b", b_columns, SQL.select( List(Column.log_name(a_table), Column.pull_date()(table3)) ::: a_columns.tail.map(_.apply(a_table))) + a_table.query_named + SQL.join_outer + table3 + " ON " + version1(a_table) + " = " + version1(table3)) val c_columns = b_columns ::: sessions_table.columns.tail val c_table = SQL.Table("c", c_columns, SQL.select(Column.log_name(b_table) :: c_columns.tail) + b_table.query_named + SQL.join_inner + sessions_table + " ON " + Column.log_name(b_table) + " = " + Column.log_name(sessions_table)) make_table(c_columns ::: List(Column.ml_statistics), body = SQL.select(c_columns.map(_.apply(c_table)) ::: List(Column.ml_statistics)) + c_table.query_named + SQL.join_outer + ml_statistics_table + " ON " + SQL.and( Column.log_name(c_table).ident + " = " + Column.log_name(ml_statistics_table).ident, Column.session_name(c_table).ident + " = " + Column.session_name(ml_statistics_table).ident)) } /* access data */ def read_domain( db: SQL.Database, table: SQL.Table, restriction: Option[Iterable[String]] = None, cache: XML.Cache = XML.Cache.make() ): Set[String] = { val column = Column.log_name db.execute_query_statement( table.select(List(column), sql = restriction match { case None => "" case Some(names) => column.where_member(names) }, distinct = true), Set.from[String], res => cache.string(res.string(column))) } def read_meta_info(db: SQL.Database, log_name: String): Option[Meta_Info] = { val table = meta_info_table val columns = table.columns.tail db.execute_query_statementO[Meta_Info]( table.select(columns, sql = Column.log_name.where_equal(log_name)), { res => val results = columns.map(c => c.name -> (if (c.T == SQL.Type.Date) res.get_date(c).map(Log_File.Date_Format(_)) else res.get_string(c))) val n = Prop.all_props.length val props = for (case (x, Some(y)) <- results.take(n)) yield (x, y) val settings = for (case (x, Some(y)) <- results.drop(n)) yield (x, y) Meta_Info(props, settings) } ) } def read_build_info( db: SQL.Database, log_name: String, session_names: List[String] = Nil, ml_statistics: Boolean = false, cache: XML.Cache = XML.Cache.make() ): Build_Info = { val table1 = sessions_table val table2 = ml_statistics_table val columns1 = table1.columns.tail.map(_.apply(table1)) val (columns, from) = if (ml_statistics) { val columns = columns1 ::: List(Column.ml_statistics(table2)) val join = table1.ident + SQL.join_outer + table2.ident + " ON " + SQL.and( Column.log_name(table1).ident + " = " + Column.log_name(table2).ident, Column.session_name(table1).ident + " = " + Column.session_name(table2).ident) (columns, SQL.enclose(join)) } else (columns1, table1.ident) val where = SQL.where_and( Column.log_name(table1).equal(log_name), Column.session_name(table1).ident + " <> ''", if_proper(session_names, Column.session_name(table1).member(session_names))) val sessions = db.execute_query_statement( SQL.select(columns, sql = from + where), Map.from[String, Session_Entry], { res => val session_name = res.string(Column.session_name) val session_entry = Session_Entry( chapter = res.string(Column.chapter), groups = split_lines(res.string(Column.groups)), hostname = res.get_string(Column.hostname), threads = res.get_int(Column.threads), start = res.get_long(Column.session_start).map(Time.ms), timing = res.timing( Column.timing_elapsed, Column.timing_cpu, Column.timing_gc), ml_timing = res.timing( Column.ml_timing_elapsed, Column.ml_timing_cpu, Column.ml_timing_gc), sources = res.get_string(Column.sources), heap_size = res.get_long(Column.heap_size).map(Space.bytes), status = res.get_string(Column.status).map(Session_Status.valueOf), errors = uncompress_errors(res.bytes(Column.errors), cache = cache), ml_statistics = if (ml_statistics) { Properties.uncompress(res.bytes(Column.ml_statistics), cache = cache) } else Nil) session_name -> session_entry } ) Build_Info(sessions) } def update_meta_info(db: SQL.Database, log_name: String, meta_info: Meta_Info): Unit = db.execute_statement(db.insert_permissive(meta_info_table), { stmt => stmt.string(1) = log_name for ((c, i) <- meta_info_table.columns.tail.zipWithIndex) { if (c.T == SQL.Type.Date) stmt.date(i + 2) = meta_info.get_date(c) else stmt.string(i + 2) = meta_info.get(c) } } ) def update_sessions( db: SQL.Database, cache: Compress.Cache, log_name: String, build_info: Build_Info, ): Unit = { val sessions = if (build_info.sessions.isEmpty) Build_Info.sessions_dummy else build_info.sessions db.execute_batch_statement(db.insert_permissive(sessions_table), for ((session_name, session) <- sessions) yield { (stmt: SQL.Statement) => stmt.string(1) = log_name stmt.string(2) = session_name stmt.string(3) = proper_string(session.chapter) stmt.string(4) = session.proper_groups stmt.string(5) = session.hostname stmt.int(6) = session.threads stmt.long(7) = session.timing.elapsed.proper_ms stmt.long(8) = session.timing.cpu.proper_ms stmt.long(9) = session.timing.gc.proper_ms stmt.double(10) = session.timing.factor stmt.long(11) = session.ml_timing.elapsed.proper_ms stmt.long(12) = session.ml_timing.cpu.proper_ms stmt.long(13) = session.ml_timing.gc.proper_ms stmt.double(14) = session.ml_timing.factor stmt.long(15) = session.heap_size.map(_.bytes) stmt.string(16) = session.status.map(_.toString) stmt.bytes(17) = compress_errors(session.errors, cache = cache) stmt.string(18) = session.sources stmt.long(19) = session.start.map(_.ms) } ) } def update_theories(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = { val sessions = if (build_info.sessions.forall({ case (_, session) => session.theory_timings.isEmpty })) Build_Info.sessions_dummy else build_info.sessions db.execute_batch_statement(db.insert_permissive(theories_table), for { (session_name, session) <- sessions (theory_name, timing) <- session.theory_timings } yield { (stmt: SQL.Statement) => stmt.string(1) = log_name stmt.string(2) = session_name stmt.string(3) = theory_name stmt.long(4) = timing.elapsed.ms stmt.long(5) = timing.cpu.ms stmt.long(6) = timing.gc.ms } ) } def update_ml_statistics( db: SQL.Database, cache: Compress.Cache, log_name: String, build_info: Build_Info ): Unit = { val ml_stats: List[(String, Option[Bytes])] = Par_List.map[(String, Session_Entry), (String, Option[Bytes])]( { case (a, b) => (a, Properties.compress(b.ml_statistics, cache = cache).proper) }, build_info.sessions.iterator.filter(p => p._2.ml_statistics.nonEmpty).toList) val entries = if (ml_stats.nonEmpty) ml_stats else List("" -> None) db.execute_batch_statement(db.insert_permissive(ml_statistics_table), for ((session_name, ml_statistics) <- entries) yield { (stmt: SQL.Statement) => stmt.string(1) = log_name stmt.string(2) = session_name stmt.bytes(3) = ml_statistics } ) } } /* database access */ def store(options: Options, cache: XML.Cache = XML.Cache.make()): Store = new Store(options, cache) class Store private[Build_Log](val options: Options, val cache: XML.Cache) { override def toString: String = { val s = Exn.result { open_database() } match { case Exn.Res(db) => val db_name = db.toString db.close() "database = " + db_name case Exn.Exn(_) => "no database" } "Store(" + s + ")" } def open_database(server: SSH.Server = SSH.no_server): PostgreSQL.Database = PostgreSQL.open_database_server(options, server = server, user = options.string("build_log_database_user"), password = options.string("build_log_database_password"), database = options.string("build_log_database_name"), host = options.string("build_log_database_host"), port = options.int("build_log_database_port"), ssh_host = options.string("build_log_ssh_host"), ssh_port = options.int("build_log_ssh_port"), ssh_user = options.string("build_log_ssh_user")) def init_database(db: SQL.Database): Unit = db.transaction { val upgrade_table = private_data.sessions_table val upgrade_column = Column.session_start val upgrade = { db.exists_table(upgrade_table) && !db.execute_query_statementB( "SELECT NULL as result FROM information_schema.columns " + " WHERE table_name = " + SQL.string(upgrade_table.name) + " AND column_name = " + SQL.string(upgrade_column.name)) } private_data.tables.lock(db, create = true) if (upgrade) { db.execute_statement( "ALTER TABLE " + upgrade_table + " ADD COLUMN " + upgrade_column.decl(db.sql_type)) db.execute_statement("DROP VIEW IF EXISTS " + private_data.universal_table) } db.create_view(private_data.pull_date_table()) db.create_view(private_data.pull_date_table(afp = true)) db.create_view(private_data.universal_table) } def snapshot_database( db: PostgreSQL.Database, sqlite_database: Path, days: Int = 100, ml_statistics: Boolean = false ): Unit = { Isabelle_System.make_directory(sqlite_database.dir) sqlite_database.file.delete using(SQLite.open_database(sqlite_database)) { db2 => private_data.transaction_lock(db, label = "Build_Log.snapshot_database") { db2.transaction { // main content db2.create_table(private_data.meta_info_table) db2.create_table(private_data.sessions_table) db2.create_table(private_data.theories_table) db2.create_table(private_data.ml_statistics_table) val recent_log_names = db.execute_query_statement( private_data.select_recent_log_names(days = days), List.from[String], res => res.string(Column.log_name)) for (log_name <- recent_log_names) { private_data.read_meta_info(db, log_name).foreach(meta_info => private_data.update_meta_info(db2, log_name, meta_info)) private_data.update_sessions(db2, cache.compress, log_name, private_data.read_build_info(db, log_name, cache = cache)) if (ml_statistics) { private_data.update_ml_statistics(db2, cache.compress, log_name, private_data.read_build_info(db, log_name, ml_statistics = true, cache = cache)) } } // pull_date for (afp <- List(false, true)) { val afp_rev = if (afp) Some("") else None val table = private_data.pull_date_table(afp) db2.create_table(table) db2.using_statement(table.insert()) { stmt2 => db.using_statement( private_data.recent_pull_date_table(days = days, afp_rev = afp_rev).query) { stmt => using(stmt.execute_query()) { res => while (res.next()) { for ((c, i) <- table.columns.zipWithIndex) { stmt2.string(i + 1) = res.get_string(c) } stmt2.execute() } } } } } // full view db2.create_view(private_data.universal_table) } } db2.vacuum() } } def write_info(db: SQL.Database, files: List[JFile], ml_statistics: Boolean = false, progress: Progress = new Progress, errors: Multi_Map[String, String] = Multi_Map.empty ): Multi_Map[String, String] = { init_database(db) val errors_result = Synchronized(errors) def add_error(name: String, exn: Throwable): Unit = errors_result.change(_.insert(name, Exn.print(exn))) val files_domain = { val names = files.map(Log_File.plain_name).toSet if (names.size > 100) None else Some(names) } abstract class Table_Status(table: SQL.Table) { private val known = Synchronized(private_data.read_domain(db, table, restriction = files_domain, cache = cache)) def required(file: JFile): Boolean = !(known.value)(Log_File.plain_name(file)) def required(log_file: Log_File): Boolean = !(known.value)(log_file.name) def update_db(db: SQL.Database, log_file: Log_File): Unit def update(log_file: Log_File): Unit = { if (required(log_file)) { update_db(db, log_file) known.change(_ + log_file.name) } } } val status = private_data.transaction_lock(db, label = "build_log_database.status") { val status1 = if (ml_statistics) { List( new Table_Status(private_data.ml_statistics_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = private_data.update_ml_statistics(db, cache.compress, log_file.name, log_file.parse_build_info(ml_statistics = true)) }) } else Nil val status2 = List( new Table_Status(private_data.meta_info_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = private_data.update_meta_info(db, log_file.name, log_file.parse_meta_info()) }, new Table_Status(private_data.sessions_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = private_data.update_sessions(db, cache.compress, log_file.name, log_file.parse_build_info()) }, new Table_Status(private_data.theories_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = private_data.update_theories(db, log_file.name, log_file.parse_build_info()) }) status1 ::: status2 } val consumer = Consumer_Thread.fork[Log_File]("build_log_database")( limit = 1, consume = { log_file => val t0 = progress.start.time val t1 = progress.now().time private_data.transaction_lock(db, label = "build_log_database.consumer") { try { status.foreach(_.update(log_file)) } catch { case exn: Throwable => add_error(log_file.name, exn) } } val t2 = progress.now().time progress.echo(verbose = true, msg = "Log " + quote(log_file.name) + " (" + (t1 - t0).message_hms + " start time, " + (t2 - t1).message + " elapsed time)") true }) try { for (file <- files.iterator if status.exists(_.required(file))) { Exn.result { Log_File.read(file, cache = cache) } match { case Exn.Res(log_file) => consumer.send(log_file) case Exn.Exn(exn) => add_error(Log_File.plain_name(file), exn) } } } finally { consumer.shutdown() } errors_result.value } } /** build history **/ object History { sealed case class Entry( known: Boolean, isabelle_version: String, afp_version: Option[String], pull_date: Date ) { def unknown: Boolean = !known def versions: (String, Option[String]) = (isabelle_version, afp_version) } object Run { val empty: Run = Run() def longest(runs: List[Run]): Run = runs.foldLeft(empty)(_ max _) } sealed case class Run(entries: List[Entry] = Nil) { def is_empty: Boolean = entries.isEmpty val length: Int = entries.length def max(other: Run): Run = if (length >= other.length) this else other def median: Option[Entry] = if (is_empty) None else Some(entries((length - 1) / 2)) override def toString: String = { val s = if (is_empty) "" else "length = " + length + ", median = " + median.get.pull_date "Build_Log.History.Run(" + s + ")" } } def retrieve( db: SQL.Database, days: Int, rev: String, afp_rev: Option[String], sql: PostgreSQL.Source, filter: Entry => Boolean = _ => true ): History = { val entries = private_data.transaction_lock(db, label = "Build_Log.History.retrieve") { val afp = afp_rev.isDefined val select_recent_versions = { val table1 = private_data.recent_pull_date_table(days = days, rev = rev, afp_rev = afp_rev) val table2 = private_data.meta_info_table val aux_table = SQL.Table("aux", table2.columns, table2.select(sql = SQL.where(sql))) val columns = table1.columns.map(c => c(table1)) ::: List(Column.known.copy(expr = Column.log_name(aux_table).defined)) SQL.select(columns, distinct = true) + table1.query_named + SQL.join_outer + aux_table.query_named + " ON " + Prop.isabelle_version(table1) + " = " + Prop.isabelle_version(aux_table) + SQL.order_by(List(Column.pull_date(afp)(table1)), descending = true) } db.execute_query_statement(select_recent_versions, List.from[Entry], { res => val known = res.bool(Column.known) val isabelle_version = res.string(Prop.isabelle_version) val afp_version = if (afp) proper_string(res.string(Prop.afp_version)) else None val pull_date = res.date(Column.pull_date(afp)) Entry(known, isabelle_version, afp_version, pull_date) }) } new History(entries.filter(filter)) } } final class History private(val entries: List[History.Entry]) { override def toString: String = "Build_Log.History(" + entries.length + ")" def unknown_runs(filter: History.Run => Boolean = _ => true): List[History.Run] = { var rest = entries val result = new mutable.ListBuffer[History.Run] while (rest.nonEmpty) { val (a, b) = Library.take_prefix[History.Entry](_.unknown, rest.dropWhile(_.known)) val run = History.Run(a) if (!run.is_empty && filter(run)) result += run rest = b } result.toList } } /** maintain build_log database **/ def build_log_database(options: Options, logs: List[Path], progress: Progress = new Progress, vacuum: Boolean = false, ml_statistics: Boolean = false, snapshot: Option[Path] = None ): Unit = { val store = Build_Log.store(options) val log_files = Log_File.find_files(logs.map(_.file)) using(store.open_database()) { db => if (vacuum) db.vacuum() progress.echo("Updating database " + db + " ...") val errors = store.write_info(db, log_files, ml_statistics = ml_statistics, progress = progress) if (errors.isEmpty) { for (path <- snapshot) { progress.echo("Writing database snapshot " + path.expand) store.snapshot_database(db, path) } } else { error(cat_lines(List.from( for ((name, rev_errs) <- errors.iterator_list) yield { val err = "The error(s) above occurred in " + quote(name) cat_lines((err :: rev_errs).reverse) } ))) } } } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("build_log_database", "update build_log database from log files", Scala_Project.here, { args => var ml_statistics: Boolean = false var snapshot: Option[Path] = None var vacuum = false var logs: List[Path] = Nil var options = Options.init() var verbose = false val getopts = Getopts(""" Usage: isabelle build_log_database [OPTIONS] Options are: -M include ML statistics -S FILE snapshot to SQLite db file -V vacuum cleaning of database -d LOG include log file start location -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose Update the build_log database server from log files, which are recursively collected from given start locations (files or directories). """, "M" -> (_ => ml_statistics = true), "S:" -> (arg => snapshot = Some(Path.explode(arg))), "V" -> (_ => vacuum = true), "d:" -> (arg => logs = logs ::: List(Path.explode(arg))), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = new Console_Progress(verbose = verbose) build_log_database(options, logs, progress = progress, vacuum = vacuum, ml_statistics = ml_statistics, snapshot = snapshot) }) } diff --git a/src/Pure/Build/build.scala b/src/Pure/Build/build.scala --- a/src/Pure/Build/build.scala +++ b/src/Pure/Build/build.scala @@ -1,926 +1,935 @@ /* Title: Pure/Build/build.scala Author: Makarius Options: :folding=explicit: Command-line tools to build and manage Isabelle sessions. */ package isabelle import scala.collection.mutable import scala.util.matching.Regex object Build { /** "isabelle build" **/ /* options */ def hostname(options: Options): String = Isabelle_System.hostname(options.string("build_hostname")) def engine_name(options: Options): String = options.string("build_engine") /* context */ sealed case class Context( store: Store, deps: isabelle.Sessions.Deps, engine: Engine = Engine.Default, afp_root: Option[Path] = None, build_hosts: List[Build_Cluster.Host] = Nil, ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), hostname: String = Isabelle_System.hostname(), numa_shuffling: Boolean = false, clean_sessions: List[String] = Nil, build_heap: Boolean = false, fresh_build: Boolean = false, no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), build_uuid: String = UUID.random_string(), build_start: Option[Date] = None, jobs: Int = 0, master: Boolean = false ) { def build_options: Options = store.options def sessions_structure: isabelle.Sessions.Structure = deps.sessions_structure def worker: Boolean = jobs > 0 override def toString: String = "Build.Context(build_uuid = " + quote(build_uuid) + if_proper(worker, ", worker = true") + if_proper(master, ", master = true") + ")" } /* results */ object Results { def apply( context: Context, results: Map[String, Process_Result] = Map.empty, other_rc: Int = Process_Result.RC.ok ): Results = { new Results(context.store, context.deps, results, other_rc) } } class Results private( val store: Store, val deps: Sessions.Deps, results: Map[String, Process_Result], other_rc: Int ) { def cache: Term.Cache = store.cache def sessions_ok: List[String] = (for { name <- deps.sessions_structure.build_topological_order.iterator result <- results.get(name) if result.ok } yield name).toList def info(name: String): Sessions.Info = deps.sessions_structure(name) def sessions: Set[String] = results.keySet def cancelled(name: String): Boolean = !results(name).defined def apply(name: String): Process_Result = results(name).strict val rc: Int = Process_Result.RC.merge(other_rc, Process_Result.RC.merge(results.valuesIterator.map(_.strict.rc))) def ok: Boolean = rc == Process_Result.RC.ok lazy val unfinished: List[String] = sessions.iterator.filterNot(apply(_).ok).toList.sorted override def toString: String = rc.toString } /* engine */ object Engine { lazy val services: List[Engine] = Isabelle_System.make_services(classOf[Engine]) def apply(name: String): Engine = services.find(_.name == name).getOrElse(error("Bad build engine " + quote(name))) class Default extends Engine("") { override def toString: String = "" } object Default extends Default } class Engine(val name: String) extends Isabelle_System.Service { engine => override def toString: String = name def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = options + "completion_limit=0" + "editor_tracing_messages=0" if (build_cluster) options1 + "build_database" + "build_log_verbose" else options1 } final def build_store(options: Options, build_cluster: Boolean = false, cache: Term.Cache = Term.Cache.make() ): Store = { val build_options = engine.build_options(options, build_cluster = build_cluster) val store = Store(build_options, build_cluster = build_cluster, cache = cache) Isabelle_System.make_directory(store.output_dir + Path.basic("log")) Isabelle_Fonts.init() store } def open_build_process( build_context: Context, build_progress: Progress, server: SSH.Server ): Build_Process = new Build_Process(build_context, build_progress, server) final def run_build_process( context: Context, progress: Progress, server: SSH.Server ): Results = { Isabelle_Thread.uninterruptible { using(open_build_process(context, progress, server)) { build_process => build_process.prepare() build_process.run() } } } } /* build */ def build( options: Options, build_hosts: List[Build_Cluster.Host] = Nil, selection: Sessions.Selection = Sessions.Selection.empty, browser_info: Browser_Info.Config = Browser_Info.Config.none, progress: Progress = new Progress, check_unknown_files: Boolean = false, build_heap: Boolean = false, clean_build: Boolean = false, afp_root: Option[Path] = None, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, max_jobs: Option[Int] = None, list_files: Boolean = false, check_keywords: Set[String] = Set.empty, fresh_build: Boolean = false, no_build: Boolean = false, soft_build: Boolean = false, export_files: Boolean = false, augment_options: String => List[Options.Spec] = _ => Nil, session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Results = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache) val build_options = store.options using(store.open_server()) { server => /* session selection and dependencies */ val full_sessions = Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs, select_dirs = select_dirs, infos = infos, augment_options = augment_options) val full_sessions_selection = full_sessions.imports_selection(selection) val build_deps = { val deps0 = Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true, list_files = list_files, check_keywords = check_keywords ).check_errors if (soft_build && !fresh_build) { val outdated = deps0.sessions_structure.build_topological_order.flatMap(name => store.try_open_database(name, server = server) match { case Some(db) => using(db)(store.read_build(_, name)) match { case Some(build) if build.ok => val session_options = deps0.sessions_structure(name).options val session_sources = deps0.sources_shasum(name) if (Sessions.eq_sources(session_options, build.sources, session_sources)) { None } else Some(name) case _ => Some(name) } case None => Some(name) }) Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), progress = progress, inlined_files = true).check_errors } else deps0 } /* check unknown files */ if (check_unknown_files) { val source_files = (for { (_, base) <- build_deps.session_bases.iterator (path, _) <- base.session_sources.iterator } yield path).toList Mercurial.check_files(source_files)._2 match { case Nil => case unknown_files => progress.echo_warning( "Unknown files (not part of the underlying Mercurial repository):" + unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) } } /* build process and results */ val clean_sessions = if (clean_build) full_sessions.imports_descendants(full_sessions_selection) else Nil val build_context = Context(store, build_deps, engine = engine, afp_root = afp_root, build_hosts = build_hosts, hostname = hostname(build_options), clean_sessions = clean_sessions, build_heap = build_heap, numa_shuffling = numa_shuffling, fresh_build = fresh_build, no_build = no_build, session_setup = session_setup, jobs = max_jobs.getOrElse(if (build_hosts.nonEmpty) 0 else 1), master = true) val results = engine.run_build_process(build_context, progress, server) if (export_files) { for (name <- full_sessions_selection.iterator if results(name).ok) { val info = results.info(name) if (info.export_files.nonEmpty) { progress.echo("Exporting " + info.name + " ...") for ((dir, prune, pats) <- info.export_files) { Export.export_files(store, name, info.dir + dir, progress = if (progress.verbose) progress else new Progress, export_prune = prune, export_patterns = pats) } } } } val presentation_sessions = results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) if (presentation_sessions.nonEmpty && !progress.stopped) { Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, progress = progress, server = server) } if (results.unfinished.nonEmpty && (progress.verbose || !no_build)) { progress.echo("Unfinished session(s): " + commas(results.unfinished)) } results } } /* build logic image */ def build_logic(options: Options, logic: String, progress: Progress = new Progress, build_heap: Boolean = false, dirs: List[Path] = Nil, fresh: Boolean = false, strict: Boolean = false ): Int = { val selection = Sessions.Selection.session(logic) val rc = if (!fresh && build(options, selection = selection, build_heap = build_heap, no_build = true, dirs = dirs).ok) Process_Result.RC.ok else { progress.echo("Build started for Isabelle/" + logic + " ...") build(options, selection = selection, progress = progress, build_heap = build_heap, fresh_build = fresh, dirs = dirs).rc } if (strict && rc != Process_Result.RC.ok) error("Failed to build Isabelle/" + logic) else rc } /* Isabelle tool wrappers */ val isabelle_tool1 = Isabelle_Tool("build", "build and manage Isabelle sessions", Scala_Project.here, { args => var afp_root: Option[Path] = None val base_sessions = new mutable.ListBuffer[String] val select_dirs = new mutable.ListBuffer[Path] val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] var numa_shuffling = false var browser_info = Browser_Info.Config.none var requirements = false var soft_build = false val exclude_session_groups = new mutable.ListBuffer[String] var all_sessions = false var build_heap = false var clean_build = false val dirs = new mutable.ListBuffer[Path] var export_files = false var fresh_build = false val session_groups = new mutable.ListBuffer[String] var max_jobs: Option[Int] = None var check_keywords: Set[String] = Set.empty var list_files = false var no_build = false var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS) var verbose = false val exclude_sessions = new mutable.ListBuffer[String] val getopts = Getopts(""" Usage: isabelle build [OPTIONS] [SESSIONS ...] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -P DIR enable HTML/PDF presentation in directory (":" for default) -R refer to requirements of selected sessions -S soft build: only observe changes of sources, not heap images -X NAME exclude sessions from group NAME and all descendants -a select all sessions -b build heap images -c clean build -d DIR include session directory -e export files from session specification into file-system -f fresh build -g NAME select session group NAME -j INT maximum number of parallel jobs (default: 1 for local build, 0 for build cluster) -k KEYWORD check theory sources for conflicts with proposed keywords -l list session source files -n no build -- take existing session build databases -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose -x NAME exclude session NAME and all descendants Build and manage Isabelle sessions: ML heaps, session databases, documents. Parameters for cluster host specifications (-H), apart from system options: """ + Library.indent_lines(4, Build_Cluster.Host.parameters.print()) + """ Notable system options: see "isabelle options -l -t build" Notable system settings: """ + Library.indent_lines(4, Build_Log.Settings.show()) + "\n", "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => base_sessions += arg), "D:" -> (arg => select_dirs += Path.explode(arg)), "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)), "N" -> (_ => numa_shuffling = true), "P:" -> (arg => browser_info = Browser_Info.Config.make(arg)), "R" -> (_ => requirements = true), "S" -> (_ => soft_build = true), "X:" -> (arg => exclude_session_groups += arg), "a" -> (_ => all_sessions = true), "b" -> (_ => build_heap = true), "c" -> (_ => clean_build = true), "d:" -> (arg => dirs += Path.explode(arg)), "e" -> (_ => export_files = true), "f" -> (_ => fresh_build = true), "g:" -> (arg => session_groups += arg), "j:" -> (arg => max_jobs = Some(Value.Nat.parse(arg))), "k:" -> (arg => check_keywords = check_keywords + arg), "l" -> (_ => list_files = true), "n" -> (_ => no_build = true), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions += arg)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) progress.echo( "Started at " + Build_Log.print_date(progress.start) + " (" + Isabelle_System.ml_identifier() + " on " + hostname(options) +")", verbose = true) progress.echo(Build_Log.Settings.show() + "\n", verbose = true) val results = progress.interrupt_handler { build(options, selection = Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions.toList, exclude_session_groups = exclude_session_groups.toList, exclude_sessions = exclude_sessions.toList, session_groups = session_groups.toList, sessions = sessions), browser_info = browser_info, progress = progress, check_unknown_files = Mercurial.is_repository(Path.ISABELLE_HOME), build_heap = build_heap, clean_build = clean_build, afp_root = afp_root, dirs = dirs.toList, select_dirs = select_dirs.toList, numa_shuffling = Host.numa_check(progress, numa_shuffling), max_jobs = max_jobs, list_files = list_files, check_keywords = check_keywords, fresh_build = fresh_build, no_build = no_build, soft_build = soft_build, export_files = export_files, build_hosts = build_hosts.toList) } val stop_date = progress.now() val elapsed_time = stop_date - progress.start progress.echo("\nFinished at " + Build_Log.print_date(stop_date), verbose = true) val total_timing = results.sessions.iterator.map(a => results(a).timing).foldLeft(Timing.zero)(_ + _). copy(elapsed = elapsed_time) progress.echo(total_timing.message_resources) sys.exit(results.rc) }) /** build cluster management **/ /* identified builds */ def read_builds(build_database: Option[SQL.Database]): List[Build_Process.Build] = build_database match { case None => Nil case Some(db) => Build_Process.read_builds(db) } def print_builds(build_database: Option[SQL.Database], builds: List[Build_Process.Build]): String = { val print_database = build_database match { case None => "" case Some(db) => " (database " + db + ")" } if (builds.isEmpty) "No build processes" + print_database else "Build processes" + print_database + builds.map(build => "\n " + build.print).mkString } def find_builds( build_database: Option[SQL.Database], build_id: String, builds: List[Build_Process.Build] ): Build_Process.Build = { (build_id, builds.length) match { case (UUID(_), _) if builds.exists(_.build_uuid == build_id) => builds.find(_.build_uuid == build_id).get case ("", 1) => builds.head case ("", 0) => error(print_builds(build_database, builds)) case _ => cat_error("Cannot identify build process " + quote(build_id), print_builds(build_database, builds)) } } /* "isabelle build_process" */ def build_process( options: Options, build_cluster: Boolean = false, list_builds: Boolean = false, remove_builds: Boolean = false, force: Boolean = false, progress: Progress = new Progress ): Unit = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = build_cluster) using(store.open_server()) { server => using_optional(store.maybe_open_build_database(server = server)) { build_database => def print(builds: List[Build_Process.Build]): Unit = if (list_builds) progress.echo(print_builds(build_database, builds)) build_database match { case None => print(Nil) + case Some(db) if remove_builds && force => + db.transaction { + val tables0 = + ML_Heap.private_data.tables.list ::: + Store.private_data.tables.list ::: + Progress.private_data.tables.list ::: + Build_Process.private_data.tables.list + val tables = tables0.filter(t => db.exists_table(t.name)).sortBy(_.name) + if (tables.nonEmpty) { + progress.echo("Removing tables " + commas_quote(tables.map(_.name)) + " ...") + db.execute_statement(SQL.MULTI(tables.map(db.destroy))) + } + } case Some(db) => Build_Process.private_data.transaction_lock(db, create = true, label = "build_process") { val builds = Build_Process.private_data.read_builds(db) - val remove = - if (!remove_builds) Nil - else if (force) builds.map(_.build_uuid) - else builds.flatMap(build => if (build.active) None else Some(build.build_uuid)) - print(builds) - if (remove.nonEmpty) { - if (remove_builds) { + if (remove_builds) { + val remove = builds.flatMap(_.active_build_uuid) + if (remove.nonEmpty) { progress.echo("Removing " + commas(remove) + " ...") Build_Process.private_data.remove_builds(db, remove) print(Build_Process.private_data.read_builds(db)) } } } } } } } val isabelle_tool2 = Isabelle_Tool("build_process", "manage session build process", Scala_Project.here, { args => var build_cluster = false var force = false var list_builds = false var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var remove_builds = false val getopts = Getopts(""" Usage: isabelle build_process [OPTIONS] Options are: -C build cluster mode (database server) -f extra force for option -r -l list build processes -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -r remove data from build processes: inactive processes (default) or all processes (option -f) Manage Isabelle build process, notably distributed build cluster (option -C). """, "C" -> (_ => build_cluster = true), "f" -> (_ => force = true), "l" -> (_ => list_builds = true), "o:" -> (arg => options = options + arg), "r" -> (_ => remove_builds = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = new Console_Progress() build_process(options, build_cluster = build_cluster, list_builds = list_builds, remove_builds = remove_builds, force = force, progress = progress) }) /* "isabelle build_worker" */ def build_worker_command( host: Build_Cluster.Host, ssh: SSH.System = SSH.Local, build_options: List[Options.Spec] = Nil, build_id: String = "", isabelle_home: Path = Path.current, afp_root: Option[Path] = None, dirs: List[Path] = Nil, quiet: Boolean = false, verbose: Boolean = false ): String = { val options = build_options ::: Options.Spec.eq("build_hostname", host.name) :: host.options ssh.bash_path(Isabelle_Tool.exe(isabelle_home)) + " build_worker" + if_proper(build_id, " -B " + Bash.string(build_id)) + if_proper(afp_root, " -A " + ssh.bash_path(afp_root.get)) + dirs.map(dir => " -d " + ssh.bash_path(dir)).mkString + if_proper(host.numa, " -N") + " -j" + host.jobs + Options.Spec.bash_strings(options, bg = true) + if_proper(quiet, " -q") + if_proper(verbose, " -v") } def build_worker( options: Options, build_id: String = "", progress: Progress = new Progress, afp_root: Option[Path] = None, dirs: List[Path] = Nil, numa_shuffling: Boolean = false, max_jobs: Option[Int] = None ): Results = { val engine = Engine(engine_name(options)) val store = engine.build_store(options, build_cluster = true) val build_options = store.options using(store.open_server()) { server => using_optional(store.maybe_open_build_database(server = server)) { build_database => val builds = read_builds(build_database) val build_master = find_builds(build_database, build_id, builds.filter(_.active)) val sessions_structure = Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs). selection(Sessions.Selection(sessions = build_master.sessions)) val build_deps = Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors val build_context = Context(store, build_deps, engine = engine, afp_root = afp_root, hostname = hostname(build_options), numa_shuffling = numa_shuffling, build_uuid = build_master.build_uuid, build_start = Some(build_master.start), jobs = max_jobs.getOrElse(1)) engine.run_build_process(build_context, progress, server) } } } val isabelle_tool3 = Isabelle_Tool("build_worker", "start worker for session build process", Scala_Project.here, { args => var afp_root: Option[Path] = None var build_id = "" var numa_shuffling = false val dirs = new mutable.ListBuffer[Path] var max_jobs: Option[Int] = None var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var quiet = false var verbose = false val getopts = Getopts(""" Usage: isabelle build_worker [OPTIONS] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B UUID existing UUID for build process (default: from database) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -d DIR include session directory -j INT maximum number of parallel jobs (default 1) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -q quiet mode: no progress -v verbose """, "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => build_id = arg), "N" -> (_ => numa_shuffling = true), "d:" -> (arg => dirs += Path.explode(arg)), "j:" -> (arg => max_jobs = Some(Value.Nat.parse(arg))), "o:" -> (arg => options = options + arg), "q" -> (_ => quiet = true), "v" -> (_ => verbose = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() val progress = if (quiet && verbose) new Progress { override def verbose: Boolean = true } else if (quiet) new Progress else new Console_Progress(verbose = verbose) val results = progress.interrupt_handler { build_worker(options, build_id = build_id, progress = progress, afp_root = afp_root, dirs = dirs.toList, numa_shuffling = Host.numa_check(progress, numa_shuffling), max_jobs = max_jobs) } if (!results.ok) sys.exit(results.rc) }) /** "isabelle build_log" **/ /* theory markup/messages from session database */ def read_theory( theory_context: Export.Theory_Context, unicode_symbols: Boolean = false ): Option[Document.Snapshot] = { def decode_bytes(bytes: Bytes): String = Symbol.output(unicode_symbols, UTF8.decode_permissive(bytes)) def read(name: String): Export.Entry = theory_context(name, permissive = true) def read_xml(name: String): XML.Body = YXML.parse_body(decode_bytes(read(name).bytes), cache = theory_context.cache) def read_source_file(name: String): Store.Source_File = theory_context.session_context.source_file(name) for { id <- theory_context.document_id() (thy_file, blobs_files) <- theory_context.files(permissive = true) } yield { val master_dir = Path.explode(Url.strip_base_name(thy_file).getOrElse( error("Cannot determine theory master directory: " + quote(thy_file)))) val blobs = blobs_files.map { name => val path = Path.explode(name) val src_path = File.relative_path(master_dir, path).getOrElse(path) val file = read_source_file(name) val bytes = file.bytes val text = decode_bytes(bytes) val chunk = Symbol.Text_Chunk(text) Command.Blob(Document.Node.Name(name), src_path, Some((file.digest, chunk))) -> Document.Blobs.Item(bytes, text, chunk, changed = false) } val thy_source = decode_bytes(read_source_file(thy_file).bytes) val thy_xml = read_xml(Export.MARKUP) val blobs_xml = for (i <- (1 to blobs.length).toList) yield read_xml(Export.MARKUP + i) val markups_index = Command.Markup_Index.make(blobs.map(_._1)) val markups = Command.Markups.make( for ((index, xml) <- markups_index.zip(thy_xml :: blobs_xml)) yield index -> Markup_Tree.from_XML(xml)) val results = Command.Results.make( for (case elem@XML.Elem(Markup(_, Markup.Serial(i)), _) <- read_xml(Export.MESSAGES)) yield i -> elem) val command = Command.unparsed(thy_source, theory = true, id = id, node_name = Document.Node.Name(thy_file, theory = theory_context.theory), blobs_info = Command.Blobs_Info.make(blobs), markups = markups, results = results) val doc_blobs = Document.Blobs.make(blobs) Document.State.init.snippet(command, doc_blobs) } } /* print messages */ def print_log( options: Options, sessions: List[String], theories: List[String] = Nil, message_head: List[Regex] = Nil, message_body: List[Regex] = Nil, progress: Progress = new Progress, margin: Double = Pretty.default_margin, breakgain: Double = Pretty.default_breakgain, metric: Pretty.Metric = Symbol.Metric, unicode_symbols: Boolean = false ): Unit = { val store = Store(options) val session = new Session(options, Resources.bootstrap) def check(filter: List[Regex], make_string: => String): Boolean = filter.isEmpty || { val s = Output.clean_yxml(make_string) filter.forall(r => r.findFirstIn(Output.clean_yxml(s)).nonEmpty) } def print(session_name: String): Unit = { using(Export.open_session_context0(store, session_name)) { session_context => val result = for { db <- session_context.session_db() theories = store.read_theories(db, session_name) errors = store.read_errors(db, session_name) info <- store.read_build(db, session_name) } yield (theories, errors, info.return_code) result match { case None => store.error_database(session_name) case Some((used_theories, errors, rc)) => theories.filterNot(used_theories.toSet) match { case Nil => case bad => error("Unknown theories " + commas_quote(bad)) } val print_theories = if (theories.isEmpty) used_theories else used_theories.filter(theories.toSet) for (thy <- print_theories) { val thy_heading = "\nTheory " + quote(thy) + " (in " + session_name + ")" + ":" Build.read_theory(session_context.theory(thy), unicode_symbols = unicode_symbols) match { case None => progress.echo(thy_heading + " MISSING") case Some(snapshot) => val rendering = new Rendering(snapshot, options, session) val messages = rendering.text_messages(Text.Range.full) .filter(message => progress.verbose || Protocol.is_exported(message.info)) if (messages.nonEmpty) { val line_document = Line.Document(snapshot.node.source) val buffer = new mutable.ListBuffer[String] for (Text.Info(range, elem) <- messages) { val line = line_document.position(range.start).line1 val pos = Position.Line_File(line, snapshot.node_name.node) def message_text: String = Protocol.message_text(elem, heading = true, pos = pos, margin = margin, breakgain = breakgain, metric = metric) val ok = check(message_head, Protocol.message_heading(elem, pos)) && check(message_body, XML.content(Pretty.unformatted(List(elem)))) if (ok) buffer += message_text } if (buffer.nonEmpty) { progress.echo(thy_heading) buffer.foreach(progress.echo(_)) } } } } if (errors.nonEmpty) { val msg = Symbol.output(unicode_symbols, cat_lines(errors)) progress.echo("\nBuild errors:\n" + Output.error_message_text(msg)) } if (rc != Process_Result.RC.ok) { progress.echo("\n" + Process_Result.RC.print_long(rc)) } } } } val errors = new mutable.ListBuffer[String] for (session_name <- sessions) { Exn.result(print(session_name)) match { case Exn.Res(_) => case Exn.Exn(exn) => errors += Exn.message(exn) } } if (errors.nonEmpty) error(cat_lines(errors.toList)) } /* Isabelle tool wrapper */ val isabelle_tool4 = Isabelle_Tool("build_log", "print messages from session build database", Scala_Project.here, { args => /* arguments */ val message_head = new mutable.ListBuffer[Regex] val message_body = new mutable.ListBuffer[Regex] var unicode_symbols = false val theories = new mutable.ListBuffer[String] var margin = Pretty.default_margin var options = Options.init() var verbose = false val getopts = Getopts(""" Usage: isabelle build_log [OPTIONS] [SESSIONS ...] Options are: -H REGEX filter messages by matching against head -M REGEX filter messages by matching against body -T NAME restrict to given theories (multiple options possible) -U output Unicode symbols -m MARGIN margin for pretty printing (default: """ + margin + """) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v print all messages, including information etc. Print messages from the session build database of the given sessions, without any checks against current sources nor session structure: results from old sessions or failed builds can be printed as well. Multiple options -H and -M are conjunctive: all given patterns need to match. Patterns match any substring, but ^ or $ may be used to match the start or end explicitly. """, "H:" -> (arg => message_head += arg.r), "M:" -> (arg => message_body += arg.r), "T:" -> (arg => theories += arg), "U" -> (_ => unicode_symbols = true), "m:" -> (arg => margin = Value.Double.parse(arg)), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) if (sessions.isEmpty) progress.echo_warning("No sessions to print") else { print_log(options, sessions, theories = theories.toList, message_head = message_head.toList, message_body = message_body.toList, margin = margin, progress = progress, unicode_symbols = unicode_symbols) } }) } diff --git a/src/Pure/Build/build_process.scala b/src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala +++ b/src/Pure/Build/build_process.scala @@ -1,1232 +1,1373 @@ /* Title: Pure/Build/build_process.scala Author: Makarius Build process for sessions, with build database, optional heap, and optional presentation. */ package isabelle import scala.collection.immutable.SortedMap import scala.math.Ordering import scala.annotation.tailrec object Build_Process { /** state vs. database **/ sealed case class Build( build_uuid: String, // Database_Progress.context_uuid + build_id: Long, ml_platform: String, options: String, start: Date, stop: Option[Date], sessions: List[String] ) { def active: Boolean = stop.isEmpty + def active_build_uuid: Option[String] = if (active) Some(build_uuid) else None def print: String = build_uuid + " (platform: " + ml_platform + ", start: " + Build_Log.print_date(start) + if_proper(stop, ", stop: " + Build_Log.print_date(stop.get)) + ")" } sealed case class Worker( worker_uuid: String, // Database_Progress.agent_uuid build_uuid: String, start: Date, stamp: Date, stop: Option[Date], serial: Long ) + object Task { + type Entry = (String, Task) + def entry(name: String, deps: List[String], build_uuid: String): Entry = + name -> Task(name, deps, build_uuid) + def entry(session: Build_Job.Session_Context, build_context: isabelle.Build.Context): Entry = + entry(session.name, session.deps, build_context.build_uuid) + } + sealed case class Task( name: String, deps: List[String], build_uuid: String - ) { + ) extends Library.Named { def is_ready: Boolean = deps.isEmpty - def resolve(dep: String): Task = - if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this + def resolve(dep: String): Option[Task] = + if (deps.contains(dep)) Some(copy(deps = deps.filterNot(_ == dep))) else None } sealed case class Job( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, start_date: Date, build: Option[Build_Job] ) extends Library.Named { def cancel(): Unit = build.foreach(_.cancel()) def is_finished: Boolean = build.isDefined && build.get.is_finished def join_build: Option[Build_Job.Result] = build.flatMap(_.join) } sealed case class Result( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, process_result: Process_Result, output_shasum: SHA1.Shasum, current: Boolean ) extends Library.Named { def ok: Boolean = process_result.ok } object Sessions { type Graph = isabelle.Graph[String, Build_Job.Session_Context] val empty: Sessions = new Sessions(Graph.string) } final class Sessions private(val graph: Sessions.Graph) { override def toString: String = graph.toString def defined(name: String): Boolean = graph.defined(name) def apply(name: String): Build_Job.Session_Context = graph.get_node(name) def iterator: Iterator[Build_Job.Session_Context] = for (name <- graph.topological_order.iterator) yield apply(name) + def data: Library.Update.Data[Build_Job.Session_Context] = + Map.from(for ((_, (session, _)) <- graph.iterator) yield session.name -> session) + def make(new_graph: Sessions.Graph): Sessions = if (graph == new_graph) this else { new Sessions( new_graph.iterator.foldLeft(new_graph) { case (g, (name, (session, _))) => g.add_deps_acyclic(name, session.deps) }) } def pull( data_domain: Set[String], data: Set[String] => List[Build_Job.Session_Context] ): Sessions = { val dom = data_domain -- iterator.map(_.name) make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) }) } def init( build_context: isabelle.Build.Context, database_server: Option[SQL.Database], progress: Progress = new Progress ): Sessions = { val sessions_structure = build_context.sessions_structure make( sessions_structure.build_graph.iterator.foldLeft(graph) { case (graph0, (name, (info, _))) => val deps = info.parent.toList val prefs = info.session_prefs val ancestors = sessions_structure.build_requirements(deps) val sources_shasum = build_context.deps.sources_shasum(name) if (graph0.defined(name)) { val session0 = graph0.get_node(name) val prefs0 = session0.session_prefs val ancestors0 = session0.ancestors val sources_shasum0 = session0.sources_shasum def err(msg: String, a: String, b: String): Nothing = error("Conflicting dependencies for session " + quote(name) + ": " + msg + "\n" + Library.indent_lines(2, a) + "\nvs.\n" + Library.indent_lines(2, b)) if (prefs0 != prefs) { err("preferences disagree", Symbol.cartouche_decoded(prefs0), Symbol.cartouche_decoded(prefs)) } if (ancestors0 != ancestors) { err("ancestors disagree", commas_quote(ancestors0), commas_quote(ancestors)) } if (sources_shasum0 != sources_shasum) { val a = sources_shasum0 - sources_shasum val b = sources_shasum - sources_shasum0 err("sources disagree", Library.trim_line(a.toString), Library.trim_line(b.toString)) } graph0 } else { val session = Build_Job.Session_Context.load(database_server, build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum, info.timeout, build_context.store, progress = progress) graph0.new_node(name, session) } } ) } lazy val max_time: Map[String, Double] = { val maximals = graph.maximals.toSet def descendants_time(name: String): Double = { if (maximals.contains(name)) apply(name).old_time.seconds else { val descendants = graph.all_succs(List(name)).toSet val g = graph.restrict(descendants) (0.0 :: g.maximals.flatMap { desc => val ps = g.all_preds(List(desc)) if (ps.exists(p => !graph.defined(p))) None else Some(ps.map(p => apply(p).old_time.seconds).sum) }).max } } Map.from( for (name <- graph.keys_iterator) yield name -> descendants_time(name)).withDefaultValue(0.0) } lazy val ordering: Ordering[String] = (a: String, b: String) => max_time(b) compare max_time(a) match { case 0 => apply(b).timeout compare apply(a).timeout match { case 0 => a compare b case ord => ord } case ord => ord } } sealed case class Snapshot( builds: List[Build], // available build configurations workers: List[Worker], // available worker processes sessions: Sessions, // static build targets pending: State.Pending, // dynamic build "queue" running: State.Running, // presently running jobs results: State.Results) // finished results object State { - type Pending = List[Task] - type Running = Map[String, Job] - type Results = Map[String, Result] + def inc_serial(serial: Long) = { + require(serial < Long.MaxValue, "serial overflow") + serial + 1 + } + + type Pending = Library.Update.Data[Task] + type Running = Library.Update.Data[Job] + type Results = Library.Update.Data[Result] } sealed case class State( serial: Long = 0, numa_nodes: List[Int] = Nil, sessions: Sessions = Sessions.empty, - pending: State.Pending = Nil, + pending: State.Pending = Map.empty, running: State.Running = Map.empty, results: State.Results = Map.empty ) { require(serial >= 0, "serial underflow") - def inc_serial: State = { - require(serial < Long.MaxValue, "serial overflow") - copy(serial = serial + 1) - } - def ready: State.Pending = pending.filter(_.is_ready) - def next_ready: State.Pending = ready.filter(entry => !is_running(entry.name)) + def next_serial: Long = State.inc_serial(serial) + def inc_serial: State = copy(serial = next_serial) - def remove_pending(name: String): State = - copy(pending = pending.flatMap( - entry => if (entry.name == name) None else Some(entry.resolve(name)))) + def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name) + def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name)) + + def remove_pending(a: String): State = + copy(pending = + pending.foldLeft(pending) { + case (map, (b, task)) => + if (a == b) map - a + else { + task.resolve(a) match { + case None => map + case Some(task1) => map + (b -> task1) + } + } + }) def is_running(name: String): Boolean = running.isDefinedAt(name) def build_running: List[Job] = List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job) def add_running(job: Job): State = copy(running = running + (job.name -> job)) def remove_running(name: String): State = copy(running = running - name) def make_result( result_name: (String, String, String), process_result: Process_Result, output_shasum: SHA1.Shasum, node_info: Host.Node_Info = Host.Node_Info.none, current: Boolean = false ): State = { val (name, worker_uuid, build_uuid) = result_name val result = Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) copy(results = results + (name -> result)) } def ancestor_results(name: String): Option[List[Result]] = { val defined = sessions.defined(name) && sessions(name).ancestors.forall(a => sessions.defined(a) && results.isDefinedAt(a)) if (defined) Some(sessions(name).ancestors.map(results)) else None } } /** SQL data model **/ object private_data extends SQL.Data("isabelle_build") { val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db") + override lazy val tables: SQL.Tables = + SQL.Tables( + Updates.table, + Sessions.table, + Pending.table, + Running.table, + Results.table, + Base.table, + Workers.table) + + private lazy val build_uuid_tables = tables.filter(Generic.build_uuid_table) + private lazy val build_id_tables = + tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t)) + def pull[A <: Library.Named]( data_domain: Set[String], data_iterator: Set[String] => Iterator[A], old_data: Map[String, A] ): Map[String, A] = { val dom = data_domain -- old_data.keysIterator val data = old_data -- old_data.keysIterator.filterNot(data_domain) if (dom.isEmpty) data else data_iterator(dom).foldLeft(data) { case (map, a) => map + (a.name -> a) } } def pull0[A <: Library.Named]( new_data: Map[String, A], old_data: Map[String, A] ): Map[String, A] = { pull(new_data.keySet, dom => new_data.valuesIterator.filter(a => dom(a.name)), old_data) } def pull1[A <: Library.Named]( data_domain: Set[String], data_base: Set[String] => Map[String, A], old_data: Map[String, A] ): Map[String, A] = { pull(data_domain, dom => data_base(dom).valuesIterator, old_data) } object Generic { + val build_id = SQL.Column.long("build_id") val build_uuid = SQL.Column.string("build_uuid") val worker_uuid = SQL.Column.string("worker_uuid") val name = SQL.Column.string("name") + def build_id_table(table: SQL.Table): Boolean = + table.columns.exists(column => column.name == build_id.name) + + def build_uuid_table(table: SQL.Table): Boolean = + table.columns.exists(column => column.name == build_uuid.name) + def sql( + build_id: Long = 0, build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = SQL.and( + if_proper(build_id > 0, Generic.build_id.equal(build_id)), if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)), if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)), if_proper(names, Generic.name.member(names))) def sql_where( + build_id: Long = 0, build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = { - SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names)) + SQL.where(sql( + build_id = build_id, + build_uuid = build_uuid, + worker_uuid = worker_uuid, + names = names)) } } + /* recorded updates */ + + object Updates { + val build_id = Generic.build_id.make_primary_key + val serial = SQL.Column.long("serial").make_primary_key + val kind = SQL.Column.int("kind").make_primary_key + val name = Generic.name.make_primary_key + + val table = make_table(List(build_id, serial, kind, name), name = "updates") + } + + def write_updates( + db: SQL.Database, + build_id: Long, + serial: Long, + updates: List[Library.Update] + ): Unit = + db.execute_batch_statement(db.insert_permissive(Updates.table), batch = + for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator) + yield { (stmt: SQL.Statement) => + require(build_id > 0 && serial > 0 && kind > 0 && name.nonEmpty, + "Bad database update: build_id = " + build_id + ", serial = " + serial + + ", kind = " + kind + ", name = " + quote(name)) + stmt.long(1) = build_id + stmt.long(2) = serial + stmt.int(3) = kind + stmt.string(4) = name + }) + + /* base table */ object Base { val build_uuid = Generic.build_uuid.make_primary_key + val build_id = Generic.build_id.make_primary_key val ml_platform = SQL.Column.string("ml_platform") val options = SQL.Column.string("options") val start = SQL.Column.date("start") val stop = SQL.Column.date("stop") - val table = make_table(List(build_uuid, ml_platform, options, start, stop)) + val table = make_table(List(build_uuid, build_id, ml_platform, options, start, stop)) + } + + def read_build_ids(db: SQL.Database, build_uuids: List[String]): List[Long] = + db.execute_query_statement( + Base.table.select(List(Base.build_id), + sql = if_proper(build_uuids, Base.build_uuid.where_member(build_uuids))), + List.from[Long], res => res.long(Base.build_id)) + + def get_build_id(db: SQL.Database, build_uuid: String): Long = { + read_build_ids(db, build_uuids = List(build_uuid)) match { + case build_id :: _ => build_id + case _ => + db.execute_query_statementO( + Base.table.select(List(Base.build_id.max)), _.long(Base.build_id)).getOrElse(0L) + 1L + } } def read_builds(db: SQL.Database): List[Build] = { val builds = db.execute_query_statement(Base.table.select(), List.from[Build], { res => val build_uuid = res.string(Base.build_uuid) + val build_id = res.long(Base.build_id) val ml_platform = res.string(Base.ml_platform) val options = res.string(Base.options) val start = res.date(Base.start) val stop = res.get_date(Base.stop) - Build(build_uuid, ml_platform, options, start, stop, Nil) + Build(build_uuid, build_id, ml_platform, options, start, stop, Nil) }) for (build <- builds.sortBy(_.start)(Date.Ordering)) yield { val sessions = private_data.read_sessions_domain(db, build_uuid = build.build_uuid) build.copy(sessions = sessions.toList.sorted) } } - def remove_builds(db: SQL.Database, remove: List[String]): Unit = - if (remove.nonEmpty) { - val sql = Generic.build_uuid.where_member(remove) - db.execute_statement(SQL.MULTI(build_uuid_tables.map(_.delete(sql = sql)))) + def remove_builds(db: SQL.Database, build_uuids: List[String]): Unit = + if (build_uuids.nonEmpty) { + val build_ids = read_build_ids(db, build_uuids = build_uuids) + + val sql1 = Generic.build_uuid.where_member(build_uuids) + val sql2 = Generic.build_id.where_member_long(build_ids) + db.execute_statement( + SQL.MULTI( + build_uuid_tables.map(_.delete(sql = sql1)) ++ + build_id_tables.map(_.delete(sql = sql2)))) } def start_build( db: SQL.Database, + build_id: Long, build_uuid: String, ml_platform: String, options: String, start: Date ): Unit = { db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = build_uuid - stmt.string(2) = ml_platform - stmt.string(3) = options - stmt.date(4) = start - stmt.date(5) = None + stmt.long(2) = build_id + stmt.string(3) = ml_platform + stmt.string(4) = options + stmt.date(5) = start + stmt.date(6) = None }) } def stop_build(db: SQL.Database, build_uuid: String): Unit = db.execute_statement( Base.table.update(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), body = { stmt => stmt.date(1) = db.now() }) def clean_build(db: SQL.Database): Unit = { val remove = db.execute_query_statement( Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.defined)), List.from[String], res => res.string(Base.build_uuid)) remove_builds(db, remove) } /* sessions */ object Sessions { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val ancestors = SQL.Column.string("ancestors") val options = SQL.Column.string("options") val sources = SQL.Column.string("sources") val timeout = SQL.Column.long("timeout") val old_time = SQL.Column.long("old_time") val old_command_timings = SQL.Column.bytes("old_command_timings") val build_uuid = Generic.build_uuid val table = make_table( List(name, deps, ancestors, options, sources, timeout, old_time, old_command_timings, build_uuid), name = "sessions") + + lazy val table_index: Int = tables.index(table) } def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] = db.execute_query_statement( Sessions.table.select(List(Sessions.name), sql = if_proper(build_uuid, Sessions.build_uuid.where_equal(build_uuid))), Set.from[String], res => res.string(Sessions.name)) def read_sessions(db: SQL.Database, names: Iterable[String] = Nil, build_uuid: String = "" ): List[Build_Job.Session_Context] = { db.execute_query_statement( Sessions.table.select( sql = SQL.where_and( if_proper(names, Sessions.name.member(names)), if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid))) ), List.from[Build_Job.Session_Context], { res => val name = res.string(Sessions.name) val deps = split_lines(res.string(Sessions.deps)) val ancestors = split_lines(res.string(Sessions.ancestors)) val options = res.string(Sessions.options) val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) val timeout = Time.ms(res.long(Sessions.timeout)) val old_time = Time.ms(res.long(Sessions.old_time)) val old_command_timings_blob = res.bytes(Sessions.old_command_timings) val build_uuid = res.string(Sessions.build_uuid) Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, timeout, old_time, old_command_timings_blob, build_uuid) } ) } def update_sessions( db: SQL.Database, sessions: Build_Process.Sessions, old_sessions: Build_Process.Sessions - ): Boolean = { - val insert = sessions.iterator.filterNot(s => old_sessions.defined(s.name)).toList + ): Library.Update = { + val update = + if (old_sessions.eq(sessions)) Library.Update.empty + else Library.Update.make(old_sessions.data, sessions.data, kind = Sessions.table_index) - if (insert.nonEmpty) { + if (update.deletes) { + db.execute_statement( + Sessions.table.delete(sql = Generic.sql_where(names = update.delete))) + } + + if (update.inserts) { db.execute_batch_statement(Sessions.table.insert(), batch = - for (session <- insert) yield { (stmt: SQL.Statement) => + for (name <- update.insert) yield { (stmt: SQL.Statement) => + val session = sessions(name) stmt.string(1) = session.name stmt.string(2) = cat_lines(session.deps) stmt.string(3) = cat_lines(session.ancestors) stmt.string(4) = session.session_prefs stmt.string(5) = session.sources_shasum.toString stmt.long(6) = session.timeout.ms stmt.long(7) = session.old_time.ms stmt.bytes(8) = session.old_command_timings_blob stmt.string(9) = session.build_uuid }) } - insert.nonEmpty + update } /* workers */ object Workers { val worker_uuid = Generic.worker_uuid.make_primary_key val build_uuid = Generic.build_uuid val start = SQL.Column.date("start") val stamp = SQL.Column.date("stamp") val stop = SQL.Column.date("stop") val serial = SQL.Column.long("serial") val table = make_table(List(worker_uuid, build_uuid, start, stamp, stop, serial), name = "workers") + + lazy val table_index: Int = tables.index(table) } def read_serial(db: SQL.Database): Long = db.execute_query_statementO[Long]( Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial)).getOrElse(0L) def read_workers( db: SQL.Database, build_uuid: String = "", worker_uuid: String = "" ): List[Worker] = { db.execute_query_statement( Workers.table.select( sql = Generic.sql_where(build_uuid = build_uuid, worker_uuid = worker_uuid)), List.from[Worker], { res => Worker( worker_uuid = res.string(Workers.worker_uuid), build_uuid = res.string(Workers.build_uuid), start = res.date(Workers.start), stamp = res.date(Workers.stamp), stop = res.get_date(Workers.stop), serial = res.long(Workers.serial)) }) } def start_worker( db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long ): Unit = { def err(msg: String): Nothing = error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) val build_stop = db.execute_query_statementO( Base.table.select(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), res => res.get_date(Base.stop)) build_stop match { case Some(None) => case Some(Some(_)) => err("for already stopped build process " + build_uuid) case None => err("for unknown build process " + build_uuid) } db.execute_statement(Workers.table.insert(), body = { stmt => val now = db.now() stmt.string(1) = worker_uuid stmt.string(2) = build_uuid stmt.date(3) = now stmt.date(4) = now stmt.date(5) = None stmt.long(6) = serial }) } def stamp_worker( db: SQL.Database, worker_uuid: String, serial: Long, stop_now: Boolean = false ): Unit = { val sql = Workers.worker_uuid.where_equal(worker_uuid) val stop = db.execute_query_statementO( Workers.table.select(List(Workers.stop), sql = sql), _.get_date(Workers.stop)).flatten db.execute_statement( Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial), sql = sql), body = { stmt => val now = db.now() stmt.date(1) = now stmt.date(2) = if (stop_now) Some(now) else stop stmt.long(3) = serial }) } /* pending jobs */ object Pending { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val build_uuid = Generic.build_uuid val table = make_table(List(name, deps, build_uuid), name = "pending") + + lazy val table_index: Int = tables.index(table) } - def read_pending(db: SQL.Database): List[Task] = + def read_pending(db: SQL.Database): State.Pending = db.execute_query_statement( - Pending.table.select(sql = SQL.order_by(List(Pending.name))), - List.from[Task], + Pending.table.select(), + Map.from[String, Task], { res => val name = res.string(Pending.name) val deps = res.string(Pending.deps) val build_uuid = res.string(Pending.build_uuid) - Task(name, split_lines(deps), build_uuid) + Task.entry(name, split_lines(deps), build_uuid) }) def update_pending( db: SQL.Database, pending: State.Pending, old_pending: State.Pending - ): Boolean = { - val (delete, insert) = Library.symmetric_difference(old_pending, pending) + ): Library.Update = { + val update = Library.Update.make(old_pending, pending, kind = Pending.table_index) - if (delete.nonEmpty) { + if (update.deletes) { db.execute_statement( - Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) + Pending.table.delete(sql = Generic.sql_where(names = update.delete))) } - if (insert.nonEmpty) { + if (update.inserts) { db.execute_batch_statement(Pending.table.insert(), batch = - for (task <- insert) yield { (stmt: SQL.Statement) => + for (name <- update.insert) yield { (stmt: SQL.Statement) => + val task = pending(name) stmt.string(1) = task.name stmt.string(2) = cat_lines(task.deps) stmt.string(3) = task.build_uuid }) } - delete.nonEmpty || insert.nonEmpty + update } /* running jobs */ object Running { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start_date = SQL.Column.date("start_date") val table = make_table( List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date), name = "running") + + lazy val table_index: Int = tables.index(table) } def read_running(db: SQL.Database): State.Running = db.execute_query_statement( - Running.table.select(sql = SQL.order_by(List(Running.name))), + Running.table.select(), Map.from[String, Job], { res => val name = res.string(Running.name) val worker_uuid = res.string(Running.worker_uuid) val build_uuid = res.string(Running.build_uuid) val hostname = res.string(Running.hostname) val numa_node = res.get_int(Running.numa_node) val rel_cpus = res.string(Running.rel_cpus) val start_date = res.date(Running.start_date) val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) name -> Job(name, worker_uuid, build_uuid, node_info, start_date, None) } ) def update_running( db: SQL.Database, running: State.Running, old_running: State.Running - ): Boolean = { - val running0 = old_running.valuesIterator.toList - val running1 = running.valuesIterator.toList - val (delete, insert) = Library.symmetric_difference(running0, running1) + ): Library.Update = { + val update = Library.Update.make(old_running, running, kind = Running.table_index) - if (delete.nonEmpty) { + if (update.deletes) { db.execute_statement( - Running.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) + Running.table.delete(sql = Generic.sql_where(names = update.delete))) } - if (insert.nonEmpty) { + if (update.inserts) { db.execute_batch_statement(Running.table.insert(), batch = - for (job <- insert) yield { (stmt: SQL.Statement) => + for (name <- update.insert) yield { (stmt: SQL.Statement) => + val job = running(name) stmt.string(1) = job.name stmt.string(2) = job.worker_uuid stmt.string(3) = job.build_uuid stmt.string(4) = job.node_info.hostname stmt.int(5) = job.node_info.numa_node stmt.string(6) = Host.Range(job.node_info.rel_cpus) stmt.date(7) = job.start_date }) } - delete.nonEmpty || insert.nonEmpty + update } /* job results */ object Results { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val rc = SQL.Column.int("rc") val out = SQL.Column.string("out") val err = SQL.Column.string("err") val timing_elapsed = SQL.Column.long("timing_elapsed") val timing_cpu = SQL.Column.long("timing_cpu") val timing_gc = SQL.Column.long("timing_gc") val output_shasum = SQL.Column.string("output_shasum") val current = SQL.Column.bool("current") val table = make_table( List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current), name = "results") + + lazy val table_index: Int = tables.index(table) } def read_results_domain(db: SQL.Database): Set[String] = db.execute_query_statement( Results.table.select(List(Results.name)), Set.from[String], res => res.string(Results.name)) def read_results(db: SQL.Database, names: Iterable[String] = Nil): State.Results = db.execute_query_statement( Results.table.select(sql = if_proper(names, Results.name.where_member(names))), Map.from[String, Result], { res => val name = res.string(Results.name) val worker_uuid = res.string(Results.worker_uuid) val build_uuid = res.string(Results.build_uuid) val hostname = res.string(Results.hostname) val numa_node = res.get_int(Results.numa_node) val rel_cpus = res.string(Results.rel_cpus) val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) val rc = res.int(Results.rc) val out = res.string(Results.out) val err = res.string(Results.err) val timing = res.timing( Results.timing_elapsed, Results.timing_cpu, Results.timing_gc) val process_result = Process_Result(rc, out_lines = split_lines(out), err_lines = split_lines(err), timing = timing) val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum)) val current = res.bool(Results.current) name -> Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) } ) def update_results( db: SQL.Database, results: State.Results, old_results: State.Results - ): Boolean = { - val insert = - results.valuesIterator.filterNot(res => old_results.isDefinedAt(res.name)).toList + ): Library.Update = { + val update = Library.Update.make(old_results, results, kind = Results.table_index) - if (insert.nonEmpty) { + if (update.deletes) { + db.execute_statement( + Results.table.delete(sql = Generic.sql_where(names = update.delete))) + } + + if (update.inserts) { db.execute_batch_statement(Results.table.insert(), batch = - for (result <- insert) yield { (stmt: SQL.Statement) => + for (name <- update.insert) yield { (stmt: SQL.Statement) => + val result = results(name) val process_result = result.process_result stmt.string(1) = result.name stmt.string(2) = result.worker_uuid stmt.string(3) = result.build_uuid stmt.string(4) = result.node_info.hostname stmt.int(5) = result.node_info.numa_node stmt.string(6) = Host.Range(result.node_info.rel_cpus) stmt.int(7) = process_result.rc stmt.string(8) = cat_lines(process_result.out_lines) stmt.string(9) = cat_lines(process_result.err_lines) stmt.long(10) = process_result.timing.elapsed.ms stmt.long(11) = process_result.timing.cpu.ms stmt.long(12) = process_result.timing.gc.ms stmt.string(13) = result.output_shasum.toString stmt.bool(14) = result.current }) } - insert.nonEmpty + update } /* collective operations */ - override val tables: SQL.Tables = - SQL.Tables( - Base.table, - Workers.table, - Sessions.table, - Pending.table, - Running.table, - Results.table) - - private val build_uuid_tables = - tables.filter(table => - table.columns.exists(column => column.name == Generic.build_uuid.name)) - def pull_database(db: SQL.Database, worker_uuid: String, state: State): State = { val serial_db = read_serial(db) if (serial_db == state.serial) state else { val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) val sessions = state.sessions.pull(read_sessions_domain(db), read_sessions(db, _)) val pending = read_pending(db) val running = pull0(read_running(db), state.running) val results = pull1(read_results_domain(db), read_results(db, _), state.results) state.copy(serial = serial, sessions = sessions, pending = pending, running = running, results = results) } } def update_database( db: SQL.Database, + build_id: Long, worker_uuid: String, state: State, old_state: State ): State = { - val changed = + val updates = List( update_sessions(db, state.sessions, old_state.sessions), update_pending(db, state.pending, old_state.pending), update_running(db, state.running, old_state.running), - update_results(db, state.results, old_state.results)) + update_results(db, state.results, old_state.results) + ).filter(_.defined) - val state1 = if (changed.exists(identity)) state.inc_serial else state - if (state1.serial != state.serial) stamp_worker(db, worker_uuid, state1.serial) - - state1 + if (updates.nonEmpty) { + val serial = state.next_serial + write_updates(db, build_id, serial, updates) + stamp_worker(db, worker_uuid, serial) + state.copy(serial = serial) + } + else state } } def read_builds(db: SQL.Database): List[Build] = private_data.transaction_lock(db, create = true, label = "Build_Process.read_builds") { private_data.read_builds(db) } + + def init_build( + db: SQL.Database, + build_context: isabelle.Build.Context, + build_start: Date + ): Long = + private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") { + val build_uuid = build_context.build_uuid + val build_id = private_data.get_build_id(db, build_uuid) + if (build_context.master) { + private_data.start_build(db, build_id, build_uuid, build_context.ml_platform, + build_context.sessions_structure.session_prefs, build_start) + } + build_id + } } /** main process **/ class Build_Process( protected final val build_context: Build.Context, protected final val build_progress: Progress, protected final val server: SSH.Server ) extends AutoCloseable { /* context */ protected final val store: Store = build_context.store protected final val build_options: Options = store.options protected final val build_deps: isabelle.Sessions.Deps = build_context.deps protected final val hostname: String = build_context.hostname protected final val build_uuid: String = build_context.build_uuid private var warning_seen = Set.empty[String] protected def warning(msg: String): Unit = synchronized { if (!warning_seen(msg)) { build_progress.echo_warning(msg) warning_seen += msg } } /* global resources with common close() operation */ protected val _database_server: Option[SQL.Database] = try { store.maybe_open_database_server(server = server) } catch { case exn: Throwable => close(); throw exn } protected val _heaps_database: Option[SQL.Database] = try { store.maybe_open_heaps_database(_database_server, server = server) } catch { case exn: Throwable => close(); throw exn } protected val _build_database: Option[SQL.Database] = try { for (db <- store.maybe_open_build_database(server = server)) yield { if (!db.is_postgresql) { error("Distributed build requires PostgreSQL (option build_database_server)") } val store_tables = db.is_postgresql Build_Process.private_data.transaction_lock(db, create = true, label = "Build_Process.build_database" ) { Build_Process.private_data.clean_build(db) if (store_tables) Store.private_data.tables.lock(db, create = true) } if (build_context.master) { db.vacuum(Build_Process.private_data.tables.list) if (store_tables) db.vacuum(Store.private_data.tables.list) } db } } catch { case exn: Throwable => close(); throw exn } protected val build_delay: Time = { val option = _build_database match { case Some(db) if db.is_postgresql => "build_cluster_delay" case _ => "build_delay" } build_options.seconds(option) } protected val _host_database: SQL.Database = try { store.open_build_database(path = Host.private_data.database, server = server) } catch { case exn: Throwable => close(); throw exn } protected val (progress, worker_uuid) = synchronized { if (_build_database.isEmpty) (build_progress, UUID.random_string()) else { try { val db = store.open_build_database(Progress.private_data.database, server = server) val progress = new Database_Progress(db, build_progress, input_messages = build_context.master, output_stopped = build_context.master, hostname = hostname, context_uuid = build_uuid, kind = "build_process", timeout = Some(build_delay)) (progress, progress.agent_uuid) } catch { case exn: Throwable => close(); throw exn } } } + protected val log: Logger = Logger.make_system_log(progress, build_options) + protected val build_start: Date = build_context.build_start getOrElse progress.now() - protected val log: Logger = Logger.make_system_log(progress, build_options) + protected val build_id: Long = + _build_database match { + case None => 0L + case Some(db) => Build_Process.init_build(db, build_context, build_start) + } protected def open_build_cluster(): Build_Cluster = Build_Cluster.make(build_context, progress = build_progress).open() protected val _build_cluster: Build_Cluster = try { if (build_context.master && _build_database.isDefined) open_build_cluster() else Build_Cluster.none } catch { case exn: Throwable => close(); throw exn } def close(): Unit = synchronized { Option(_database_server).flatten.foreach(_.close()) Option(_heaps_database).flatten.foreach(_.close()) Option(_build_database).flatten.foreach(_.close()) Option(_host_database).foreach(_.close()) Option(_build_cluster).foreach(_.close()) progress match { case db_progress: Database_Progress => db_progress.close() case _ => } } /* global state: internal var vs. external database */ protected var _state: Build_Process.State = Build_Process.State() protected def synchronized_database[A](label: String)(body: => A): A = synchronized { _build_database match { case None => body case Some(db) => Build_Process.private_data.transaction_lock(db, label = label) { val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state) _state = old_state val res = body - _state = Build_Process.private_data.update_database(db, worker_uuid, _state, old_state) + _state = + Build_Process.private_data.update_database( + db, build_id, worker_uuid, _state, old_state) res } } } /* policy operations */ - protected def init_state(state: Build_Process.State): Build_Process.State = { - val sessions1 = state.sessions.init(build_context, _database_server, progress = build_progress) - - val old_pending = state.pending.iterator.map(_.name).toSet - val new_pending = - List.from( - for (session <- sessions1.iterator if !old_pending(session.name)) - yield Build_Process.Task(session.name, session.deps, build_uuid)) - val pending1 = new_pending ::: state.pending - - state.copy(sessions = sessions1, pending = pending1) - } - protected def next_jobs(state: Build_Process.State): List[String] = { val limit = { if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 } else build_context.jobs - state.build_running.length } if (limit > 0) state.next_ready.sortBy(_.name)(state.sessions.ordering).take(limit).map(_.name) else Nil } protected def next_node_info(state: Build_Process.State, session_name: String): Host.Node_Info = { def used_nodes: Set[Int] = Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i) val numa_node = Host.next_numa_node(_host_database, hostname, state.numa_nodes, used_nodes) Host.Node_Info(hostname, numa_node, Nil) } protected def start_session( state: Build_Process.State, session_name: String, ancestor_results: List[Build_Process.Result] ): Build_Process.State = { val sources_shasum = state.sessions(session_name).sources_shasum val input_shasum = if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) val store_heap = build_context.build_heap || Sessions.is_pure(session_name) || state.sessions.iterator.exists(_.ancestors.contains(session_name)) val (current, output_shasum) = store.check_output(_database_server, session_name, session_options = build_context.sessions_structure(session_name).options, sources_shasum = sources_shasum, input_shasum = input_shasum, fresh_build = build_context.fresh_build, store_heap = store_heap) val finished = current && ancestor_results.forall(_.current) val skipped = build_context.no_build val cancelled = progress.stopped || !ancestor_results.forall(_.ok) if (!skipped && !cancelled) { for (db <- _database_server orElse _heaps_database) { val hierarchy = (session_name :: ancestor_results.map(_.name)) .map(store.output_session(_, store_heap = true)) ML_Heap.restore(db, hierarchy, cache = store.cache.compress) } } val result_name = (session_name, worker_uuid, build_uuid) if (finished) { state .remove_pending(session_name) .make_result(result_name, Process_Result.ok, output_shasum, current = true) } else if (skipped) { progress.echo("Skipping " + session_name + " ...", verbose = true) state. remove_pending(session_name). make_result(result_name, Process_Result.error, output_shasum) } else if (cancelled) { if (build_context.master) { progress.echo(session_name + " CANCELLED") state .remove_pending(session_name) .make_result(result_name, Process_Result.undefined, output_shasum) } else state } else { val build_log_verbose = build_options.bool("build_log_verbose") val start = progress.now() val start_time = start - build_start val start_time_msg = build_log_verbose val node_info = next_node_info(state, session_name) val node_info_msg = node_info.numa || build_log_verbose progress.echo( (if (store_heap) "Building " else "Running ") + session_name + if_proper(start_time_msg || node_info_msg, " (" + if_proper(start_time_msg, "started " + start_time.message_hms) + if_proper(start_time_msg && node_info_msg, " ") + if_proper(node_info_msg, "on " + node_info.toString) + ")") + " ...") val session = state.sessions(session_name) val background = build_deps.background(session_name) val build = Build_Job.start_session(build_context, session, progress, log, server, background, sources_shasum, input_shasum, node_info, store_heap) state.add_running( Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, start, Some(build))) } } /* build process roles */ final def is_session_name(job_name: String): Boolean = !Long_Name.is_qualified(job_name) - protected final def start_build(): Unit = synchronized_database("Build_Process.start_build") { - for (db <- _build_database) { - Build_Process.private_data.start_build(db, build_uuid, build_context.ml_platform, - build_context.sessions_structure.session_prefs, build_start) - } - } - protected final def stop_build(): Unit = synchronized_database("Build_Process.stop_build") { for (db <- _build_database) { Build_Process.private_data.stop_build(db, build_uuid) } } protected final def start_worker(): Unit = synchronized_database("Build_Process.start_worker") { for (db <- _build_database) { _state = _state.inc_serial Build_Process.private_data.start_worker(db, worker_uuid, build_uuid, _state.serial) } } protected final def stop_worker(): Unit = synchronized_database("Build_Process.stop_worker") { for (db <- _build_database) { Build_Process.private_data.stamp_worker(db, worker_uuid, _state.serial, stop_now = true) } } /* prepare */ def prepare(): Unit = { for (name <- build_context.clean_sessions) { store.clean_output(_database_server orElse _heaps_database, name, progress = progress) } } /* run */ protected def finished(): Boolean = synchronized { if (!build_context.master && progress.stopped) _state.build_running.isEmpty else _state.pending.isEmpty } protected def sleep(): Unit = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() } + protected def init_unsynchronized(): Unit = { + if (build_context.master) { + val sessions1 = + _state.sessions.init(build_context, _database_server, progress = build_progress) + val pending1 = + sessions1.iterator.foldLeft(_state.pending) { + case (map, session) => + if (map.isDefinedAt(session.name)) map + else map + Build_Process.Task.entry(session, build_context) + } + _state = _state.copy(sessions = sessions1, pending = pending1) + } + + val numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling) + _state = _state.copy(numa_nodes = numa_nodes) + } + protected def main_unsynchronized(): Unit = { for (job <- _state.build_running.filter(_.is_finished)) { _state = _state.remove_running(job.name) for (result <- job.join_build) { val result_name = (job.name, worker_uuid, build_uuid) _state = _state. remove_pending(job.name). make_result(result_name, result.process_result, result.output_shasum, node_info = job.node_info) } } for (name <- next_jobs(_state)) { if (is_session_name(name)) { if (build_context.sessions_structure.defined(name)) { _state.ancestor_results(name) match { case Some(results) => _state = start_session(_state, name, results) case None => warning("Bad build job " + quote(name) + ": no ancestor results") } } else warning("Bad build job " + quote(name) + ": no session info") } else warning("Bad build job " + quote(name)) } } def run(): Build.Results = { val vacuous = synchronized_database("Build_Process.init") { - if (build_context.master) { - _build_cluster.init() - _state = init_state(_state) - } - _state = _state.copy(numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling)) + _build_cluster.init() + init_unsynchronized() build_context.master && _state.pending.isEmpty } if (vacuous) { progress.echo_warning("Nothing to build") + if (build_context.master) stop_build() Build.Results(build_context) } else { - if (build_context.master) start_build() start_worker() _build_cluster.start() try { while (!finished()) { synchronized_database("Build_Process.main") { if (progress.stopped) _state.build_running.foreach(_.cancel()) main_unsynchronized() } sleep() } } finally { _build_cluster.stop() stop_worker() if (build_context.master) stop_build() } synchronized_database("Build_Process.result") { val results = for ((name, result) <- _state.results) yield name -> result.process_result Build.Results(build_context, results = results, other_rc = _build_cluster.rc) } } } /* snapshot */ def snapshot(): Build_Process.Snapshot = synchronized_database("Build_Process.snapshot") { val (builds, workers) = _build_database match { case None => (Nil, Nil) case Some(db) => (Build_Process.private_data.read_builds(db), Build_Process.private_data.read_workers(db)) } Build_Process.Snapshot( builds = builds, workers = workers, sessions = _state.sessions, pending = _state.pending, running = _state.running, results = _state.results) } /* toString */ override def toString: String = "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) + if_proper(build_context.master, ", master = true") + ")" } diff --git a/src/Pure/Build/build_schedule.scala b/src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala +++ b/src/Pure/Build/build_schedule.scala @@ -1,1596 +1,1594 @@ /* Title: Pure/Build/build_schedule.scala Author: Fabian Huch, TU Muenchen Build schedule generated by Heuristic methods, allowing for more efficient builds. */ package isabelle import Host.Node_Info import scala.annotation.tailrec import scala.collection.mutable object Build_Schedule { /* organized historic timing information (extracted from build logs) */ case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) { def elapsed: Time = timing.elapsed def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms) } object Timing_Data { def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2) def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2) def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size) private def dummy_entries(host: Host, host_factor: Double) = { val baseline = Time.minutes(5).scale(host_factor) val gc = Time.seconds(10).scale(host_factor) List( Result("dummy", host.name, 1, Timing(baseline, baseline, gc)), Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc))) } def make( host_infos: Host_Infos, build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)], ): Timing_Data = { val hosts = host_infos.hosts val measurements = for { (meta_info, build_info) <- build_history build_host = meta_info.get_build_host (job_name, session_info) <- build_info.sessions.toList if build_info.finished_sessions.contains(job_name) hostname <- session_info.hostname.orElse(build_host).toList host <- hosts.find(_.info.hostname == hostname).toList threads = session_info.threads.getOrElse(host.info.num_cpus) } yield (job_name, hostname, threads) -> session_info.timing val entries = if (measurements.isEmpty) { val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last host_infos.hosts.flatMap(host => dummy_entries(host, host_infos.host_factor(default_host, host))) } else measurements.groupMap(_._1)(_._2).toList.map { case ((job_name, hostname, threads), timings) => Result(job_name, hostname, threads, median_timing(timings)) } new Timing_Data(new Facet(entries), host_infos) } def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = { val build_history = for { log_name <- log_database.execute_query_statement( Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)), List.from[String], res => res.string(Build_Log.Column.log_name)) meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name) build_info = Build_Log.private_data.read_build_info(log_database, log_name) } yield (meta_info, build_info) make(host_infos, build_history) } /* data facets */ object Facet { def unapply(facet: Facet): Option[List[Result]] = Some(facet.results) } class Facet private[Timing_Data](val results: List[Result]) { require(results.nonEmpty) def is_empty: Boolean = results.isEmpty def size: Int = results.length lazy val by_job: Map[String, Facet] = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap lazy val by_threads: Map[Int, Facet] = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap lazy val by_hostname: Map[String, Facet] = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed)) def best_result: Result = results.minBy(_.elapsed.ms) } } class Timing_Data private(facet: Timing_Data.Facet, val host_infos: Host_Infos) { private def inflection_point(last_mono: Int, next: Int): Int = last_mono + ((next - last_mono) / 2) def best_threads(job_name: String, max_threads: Int): Int = { val worse_threads = facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { case (hostname, facet) => val best_threads = facet.best_result.threads facet.by_threads.keys.toList.sorted.find(_ > best_threads).map( inflection_point(best_threads, _)) } (max_threads :: worse_threads).min } private def hostname_factor(from: String, to: String): Double = host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to)) private def approximate_threads(entries_unsorted: List[(Int, Time)], threads: Int): Time = { val entries = entries_unsorted.sortBy(_._1) def sorted_prefix[A](xs: List[A], f: A => Long): List[A] = xs match { case x1 :: x2 :: xs => if (f(x1) <= f(x2)) x1 :: sorted_prefix(x2 :: xs, f) else x1 :: Nil case xs => xs } def linear(p0: (Int, Time), p1: (Int, Time)): Time = { val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1)) val b = p0._2 - a.scale(p0._1) (a.scale(threads) + b) max Time.zero } val mono_prefix = sorted_prefix(entries, e => -e._2.ms) val is_mono = entries == mono_prefix val in_prefix = mono_prefix.length > 1 && threads <= mono_prefix.last._1 val in_inflection = !is_mono && mono_prefix.length > 1 && threads < entries.drop(mono_prefix.length).head._1 if (is_mono || in_prefix || in_inflection) { // Model with Amdahl's law val t_p = Timing_Data.median_time(for { (n, t0) <- mono_prefix (m, t1) <- mono_prefix if m != n } yield (t0 - t1).scale(n.toDouble * m / (m - n))) val t_c = Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n)) def model(threads: Int): Time = (t_c + t_p.scale(1.0 / threads)) max Time.zero if (is_mono || in_prefix) model(threads) else { val post_inflection = entries.drop(mono_prefix.length).head val inflection_threads = inflection_point(mono_prefix.last._1, post_inflection._1) if (threads <= inflection_threads) model(threads) else linear((inflection_threads, model(inflection_threads)), post_inflection) } } else { // Piecewise linear val (p0, p1) = if (entries.head._1 < threads && threads < entries.last._1) { val split = entries.partition(_._1 < threads) (split._1.last, split._2.head) } else { val piece = if (threads < entries.head._1) entries.take(2) else entries.takeRight(2) (piece.head, piece.last) } linear(p0, p1) } } private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = { def unify(hostname: String, facet: Timing_Data.Facet) = facet.mean_time.scale(hostname_factor(hostname, on_host)) for { facet <- facet.by_job.get(job_name).toList (threads, facet) <- facet.by_threads entries = facet.by_hostname.toList.map(unify) } yield threads -> Timing_Data.median_time(entries) } def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = { def try_approximate(facet: Timing_Data.Facet): Option[Time] = { val entries = facet.by_threads.toList match { case List((i, Timing_Data.Facet(List(result)))) if i != 1 => (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList case entries => entries.map((threads, facet) => threads -> facet.mean_time) } if (entries.size < 2) None else Some(approximate_threads(entries, threads)) } for { facet <- facet.by_job.get(job_name) facet <- facet.by_hostname.get(hostname) time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet)) } yield time } def global_threads_factor(from: Int, to: Int): Double = { def median(xs: Iterable[Double]): Double = xs.toList.sorted.apply(xs.size / 2) val estimates = for { (hostname, facet) <- facet.by_hostname job_name <- facet.by_job.keys from_time <- estimate_threads(job_name, hostname, from) to_time <- estimate_threads(job_name, hostname, to) } yield from_time.ms.toDouble / to_time.ms if (estimates.nonEmpty) median(estimates) else { // unify hosts val estimates = for { (job_name, facet) <- facet.by_job hostname = facet.by_hostname.keys.head entries = unify_hosts(job_name, hostname) if entries.length > 1 } yield approximate_threads(entries, from).ms.toDouble / approximate_threads(entries, to).ms if (estimates.nonEmpty) median(estimates) else from.toDouble / to.toDouble } } private var cache: Map[(String, String, Int), Time] = Map.empty /* approximation factors -- penalize estimations with less information */ val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5 val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7 val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5 val FACTOR_NO_THREADS_SAME_MACHINE = 1.4 val FACTOR_THREADS_OTHER_MACHINE = 1.2 def estimate(job_name: String, hostname: String, threads: Int): Time = { def estimate: Time = facet.by_job.get(job_name) match { case None => // no data for job, take average of other jobs for given threads val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads)) if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates) else { // no other job to estimate from, use global curve to approximate any other job val (threads1, facet1) = facet.by_threads.head facet1.mean_time.scale(global_threads_factor(threads1, threads)) } case Some(facet) => facet.by_threads.get(threads) match { case None => // interpolate threads estimate_threads(job_name, hostname, threads).map(_.scale( FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse { // per machine, try to approximate config for threads val approximated = for { hostname1 <- facet.by_hostname.keys estimate <- estimate_threads(job_name, hostname1, threads) factor = hostname_factor(hostname1, hostname) } yield estimate.scale(factor) if (approximated.nonEmpty) Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE) else { // no single machine where config can be approximated, unify data points val unified_entries = unify_hosts(job_name, hostname) if (unified_entries.length > 1) approximate_threads(unified_entries, threads).scale( FACTOR_NO_THREADS_UNIFY_MACHINES) else { // only single data point, use global curve to approximate val (job_threads, job_time) = unified_entries.head job_time.scale(global_threads_factor(job_threads, threads)).scale( FACTOR_NO_THREADS_GLOBAL_CURVE) } } } case Some(facet) => // time for job/thread exists, interpolate machine if necessary facet.by_hostname.get(hostname).map(_.mean_time).getOrElse { Timing_Data.median_time( facet.by_hostname.toList.map((hostname1, facet) => facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale( FACTOR_THREADS_OTHER_MACHINE) } } } cache.get(job_name, hostname, threads) match { case Some(time) => time case None => val time = estimate cache = cache + ((job_name, hostname, threads) -> time) time } } } /* host information */ case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host) { def name: String = info.hostname def num_cpus: Int = info.num_cpus def max_threads(options: Options): Int = (options ++ build.options).threads(default = num_cpus) } object Host_Infos { def dummy: Host_Infos = new Host_Infos( List(Host(isabelle.Host.Info("dummy", Nil, 8, Some(1.0)), Build_Cluster.Host("dummy")))) def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = { def get_host(build_host: Build_Cluster.Host): Host = { val info = isabelle.Host.read_info(db, build_host.name).getOrElse( error("No benchmark for " + quote(build_host.name))) Host(info, build_host) } new Host_Infos(build_hosts.map(get_host)) } } class Host_Infos private(val hosts: List[Host]) { require(hosts.nonEmpty) private val by_hostname = hosts.map(host => host.name -> host).toMap def host_factor(from: Host, to: Host): Double = from.info.benchmark_score.get / to.info.benchmark_score.get val host_speeds: Ordering[Host] = Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1) def the_host(hostname: String): Host = by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname))) def the_host(node_info: Node_Info): Host = the_host(node_info.hostname) def num_threads(node_info: Node_Info): Int = if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length else the_host(node_info).info.num_cpus def available(state: Build_Process.State): Resources = { val allocated = state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _) new Resources(this, allocated) } } /* offline tracking of job configurations and resource allocations */ case class Config(job_name: String, node_info: Node_Info) { def job_of(start_time: Time): Build_Process.Job = Build_Process.Job(job_name, "", "", node_info, Date(start_time), None) } class Resources( val host_infos: Host_Infos, allocated_nodes: Map[String, List[Node_Info]] ) { def unused_nodes(host: Host, threads: Int): List[Node_Info] = if (!available(host, threads)) Nil else { val node = next_node(host, threads) node :: allocate(node).unused_nodes(host, threads) } def unused_nodes(threads: Int): List[Node_Info] = host_infos.hosts.flatMap(unused_nodes(_, threads)) def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil) def allocate(node_info: Node_Info): Resources = { val host = host_infos.the_host(node_info) new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host)))) } def try_allocate_tasks( hosts: List[(Host, Int)], tasks: List[(Build_Process.Task, Int, Int)], ): (List[Config], Resources) = tasks match { case Nil => (Nil, this) case (task, min_threads, max_threads) :: tasks => val (config, resources) = hosts.find((host, _) => available(host, min_threads)) match { case Some((host, host_max_threads)) => val free_threads = host.info.num_cpus - ((host.build.jobs - 1) * host_max_threads) val node_info = next_node(host, (min_threads max free_threads) min max_threads) (Some(Config(task.name, node_info)), allocate(node_info)) case None => (None, this) } val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks) (configs ++ config, resources1) } def next_node(host: Host, threads: Int): Node_Info = { val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1) def explicit_cpus(node_info: Node_Info): List[Int] = if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _) val available_nodes = host.info.numa_nodes val numa_node = if (!host.build.numa) None else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil Node_Info(host.name, numa_node, rel_cpus) } def available(host: Host, threads: Int): Boolean = { val used = allocated(host) if (used.length >= host.build.jobs) false else { if (host.info.numa_nodes.length <= 1) used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus else { def node_threads(n: Int): Int = used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum host.info.numa_nodes.exists( node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length) } } } } /* schedule generation */ object Schedule { case class Node(job_name: String, node_info: Node_Info, start: Date, duration: Time) { def end: Date = Date(start.time + duration) } type Graph = isabelle.Graph[String, Node] def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty) } case class Schedule( build_uuid: String, generator: String, start: Date, graph: Schedule.Graph, serial: Long = 0, ) { require(serial >= 0, "serial underflow") - def inc_serial: Schedule = { - require(serial < Long.MaxValue, "serial overflow") - copy(serial = serial + 1) - } - + + def next_serial: Long = Build_Process.State.inc_serial(serial) + def end: Date = if (graph.is_empty) start else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch) def duration: Time = end - start def message: String = "Estimated " + duration.message_hms + " build time with " + generator def deviation(other: Schedule): Time = Time.ms((end - other.end).ms.abs) def num_built(state: Build_Process.State): Int = graph.keys.count(state.results.contains) def elapsed(): Time = Time.now() - start.time def is_empty: Boolean = graph.is_empty def is_outdated(options: Options, state: Build_Process.State): Boolean = if (is_empty) true else { num_built(state) > options.int("build_schedule_outdated_limit") && elapsed() > options.seconds("build_schedule_outdated_delay") } def next(hostname: String, state: Build_Process.State): List[String] = for { task <- state.next_ready node = graph.get_node(task.name) if hostname == node.node_info.hostname if graph.imm_preds(node.job_name).subsetOf(state.results.keySet) } yield task.name def update(state: Build_Process.State): Schedule = { val start1 = Date.now() - val pending = state.pending.map(_.name).toSet def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph = graph.map_node(name, { node => val elapsed = start1 - state.running(name).start_date node.copy(duration = node.duration - elapsed) }) def shift_starts(graph: Schedule.Graph, name: String): Schedule.Graph = graph.map_node(name, { node => val starts = start1 :: graph.imm_preds(node.job_name).toList.map(graph.get_node(_).end) node.copy(start = starts.max(Date.Ordering)) }) - val graph0 = state.running.keys.foldLeft(graph.restrict(pending.contains))(shift_elapsed) + val graph0 = + state.running.keys.foldLeft(graph.restrict(state.pending.isDefinedAt))(shift_elapsed) val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts) copy(start = start1, graph = graph1) } } case class State(build_state: Build_Process.State, current_time: Time, finished: Schedule) { def start(config: Config): State = copy(build_state = build_state.copy(running = build_state.running + (config.job_name -> config.job_of(current_time)))) def step(timing_data: Timing_Data): State = { val remaining = build_state.running.values.toList.map { job => val elapsed = current_time - job.start_date.time val threads = timing_data.host_infos.num_threads(job.node_info) val predicted = timing_data.estimate(job.name, job.node_info.hostname, threads) val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed job -> remaining } if (remaining.isEmpty) error("Schedule step without running sessions") else { val (job, elapsed) = remaining.minBy(_._2.ms) val now = current_time + elapsed val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time) val host_preds = for { name <- finished.graph.keys pred_node = finished.graph.get_node(name) if pred_node.node_info.hostname == job.node_info.hostname if pred_node.end.time <= node.start.time } yield name val build_preds = build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined) val preds = build_preds ++ host_preds val graph = preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name)) val build_state1 = build_state.remove_running(job.name).remove_pending(job.name) State(build_state1, now, finished.copy(graph = graph)) } } def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty } trait Scheduler { def schedule(build_state: Build_Process.State): Schedule } trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] } case class Generation_Scheme( priority_rule: Priority_Rule, timing_data: Timing_Data, build_uuid: String ) extends Scheduler { def schedule(build_state: Build_Process.State): Schedule = { @tailrec def simulate(state: State): State = if (state.is_finished) state else { val state1 = priority_rule .select_next(state.build_state) .foldLeft(state)(_.start(_)) .step(timing_data) simulate(state1) } val start = Date.now() val name = "generation scheme (" + priority_rule + ")" val end_state = simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty))) end_state.finished } } case class Optimizer(schedulers: List[Scheduler]) extends Scheduler { require(schedulers.nonEmpty) def schedule(state: Build_Process.State): Schedule = { def main(scheduler: Scheduler): Schedule = scheduler.schedule(state) Par_List.map(main, schedulers).minBy(_.duration.ms) } } /* priority rules */ class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule { override def toString: String = "default heuristic" def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] = sorted_jobs.zip(resources.unused_nodes(host, host.max_threads(options))).map(Config(_, _)) def select_next(state: Build_Process.State): List[Config] = { val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name) val resources = host_infos.available(state) host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) { case ((jobs, res), host) => val configs = next_jobs(resources, jobs, host) val config_jobs = configs.map(_.job_name).toSet (jobs.filterNot(config_jobs.contains), configs ::: res) }._2 } } object Path_Time_Heuristic { sealed trait Critical_Criterion case class Absolute_Time(time: Time) extends Critical_Criterion { override def toString: String = "absolute time (" + time.message_hms + ")" } case class Relative_Time(factor: Double) extends Critical_Criterion { override def toString: String = "relative time (" + factor + ")" } sealed trait Parallel_Strategy case class Fixed_Thread(threads: Int) extends Parallel_Strategy { override def toString: String = "fixed threads (" + threads + ")" } case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy { override def toString: String = "time based threads" } sealed trait Host_Criterion case object Critical_Nodes extends Host_Criterion { override def toString: String = "per critical node" } case class Fixed_Fraction(fraction: Double) extends Host_Criterion { override def toString: String = "fixed fraction (" + fraction + ")" } case class Host_Speed(min_factor: Double) extends Host_Criterion { override def toString: String = "host speed (" + min_factor + ")" } } class Path_Time_Heuristic( is_critical: Path_Time_Heuristic.Critical_Criterion, parallel_threads: Path_Time_Heuristic.Parallel_Strategy, host_criterion: Path_Time_Heuristic.Host_Criterion, timing_data: Timing_Data, sessions_structure: Sessions.Structure, max_threads_limit: Int = 8 ) extends Priority_Rule { import Path_Time_Heuristic.* override def toString: Node = { val params = List( "critical: " + is_critical, "parallel: " + parallel_threads, "fast hosts: " + host_criterion) "path time heuristic (" + params.mkString(", ") + ")" } /* pre-computed properties for efficient heuristic */ val host_infos: Host_Infos = timing_data.host_infos val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds) val max_threads: Int = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit type Node = String val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph val minimals: List[Node] = build_graph.minimals val maximals: List[Node] = build_graph.maximals def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet val maximals_all_preds: Map[Node, Set[Node]] = maximals.map(node => node -> all_preds(node)).toMap val best_threads: Map[Node, Int] = build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap def best_time(node: Node): Time = { val host = ordered_hosts.last val threads = best_threads(node) min host.info.num_cpus timing_data.estimate(node, host.name, threads) } val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms) def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) def max_time(task: Build_Process.Task): Time = max_time(task.name) def path_times(minimals: List[Node]): Map[Node, Time] = { def time_ms(node: Node): Long = best_times(node).ms val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) path_times_ms.view.mapValues(Time.ms).toMap } def path_max_times(minimals: List[Node]): Map[Node, Time] = path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap val node_degrees: Map[Node, Int] = build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap def parallel_paths( running: List[(Node, Time)], nodes: Set[Node] = build_graph.keys.toSet, max: Int = Int.MaxValue ): Int = if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max else { def start(node: Node): (Node, Time) = node -> best_times(node) def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = node -> (time - elapsed) def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = if (running.size >= max) (max, running) else if (running.isEmpty) (0, running) else { def get_next(node: Node): List[Node] = build_graph.imm_succs(node).intersect(nodes).filter( build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList val (next, elapsed) = running.minBy(_._2.ms) val (remaining, finished) = running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) val running1 = remaining.map(pass_time(elapsed)).toMap ++ finished.map(_._1).flatMap(get_next).map(start) val (res, running2) = parallel_paths(running1) (res max running.size, running2) } parallel_paths(running.toMap)._1 } def select_next(state: Build_Process.State): List[Config] = { val resources = host_infos.available(state) def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name) val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) val available_nodes = host_infos.available(state.copy(running = Map.empty)) .unused_nodes(max_threads) .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse def remaining_time(node: Node): (Node, Time) = state.running.get(node) match { case None => node -> best_times(node) case Some(job) => val estimate = timing_data.estimate(job.name, job.node_info.hostname, host_infos.num_threads(job.node_info)) node -> ((Time.now() - job.start_date.time + estimate) max Time.zero) } val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse val is_parallelizable = available_nodes.length >= parallel_paths( state.ready.map(_.name).map(remaining_time), max = available_nodes.length + 1) if (is_parallelizable) { val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 } else { def is_critical(time: Time): Boolean = this.is_critical match { case Absolute_Time(threshold) => time > threshold case Relative_Time(factor) => time > minimals.map(max_time).maxBy(_.ms).scale(factor) } val critical_minimals = state.ready.filter(task => is_critical(max_time(task))).map(_.name) val critical_nodes = path_max_times(critical_minimals).filter((_, time) => is_critical(time)).keySet val (critical, other) = next_sorted.partition(task => critical_nodes.contains(task.name)) val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) def parallel_threads(task: Build_Process.Task): Int = this.parallel_threads match { case Fixed_Thread(threads) => threads case Time_Based_Threads(f) => f(best_times(task.name)) } val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) val max_critical_parallel = parallel_paths(critical_minimals.map(remaining_time), critical_nodes) val max_critical_hosts = available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length val split = this.host_criterion match { case Critical_Nodes => max_critical_hosts case Fixed_Fraction(fraction) => ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts case Host_Speed(min_factor) => val best = rev_ordered_hosts.head._1.info.benchmark_score.get val num_fast = rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor)) num_fast min max_critical_hosts } val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split) val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) configs1 ::: configs2 } } } /* master and slave processes for scheduled build */ class Scheduled_Build_Process( build_context: Build.Context, build_progress: Progress, server: SSH.Server, ) extends Build_Process(build_context, build_progress, server) { /* global state: internal var vs. external database */ protected var _schedule: Schedule = Schedule.init(build_uuid) override protected def synchronized_database[A](label: String)(body: => A): A = synchronized { _build_database match { case None => body case Some(db) => db.transaction_lock(Build_Schedule.private_data.all_tables, label = label) { val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state) val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule) _state = old_state _schedule = old_schedule val res = body _state = - Build_Process.private_data.update_database(db, worker_uuid, _state, old_state) + Build_Process.private_data.update_database( + db, build_id, worker_uuid, _state, old_state) _schedule = Build_Schedule.private_data.update_schedule(db, _schedule, old_schedule) res } } } /* build process */ override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = _schedule.graph.get_node(session_name).node_info override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped || _schedule.is_empty) Nil else _schedule.next(hostname, state) } abstract class Scheduler_Build_Process( build_context: Build.Context, build_progress: Progress, server: SSH.Server, ) extends Scheduled_Build_Process(build_context, build_progress, server) { require(build_context.master) protected val start_date: Date = Date.now() for (db <- _build_database) { Build_Schedule.private_data.transaction_lock( db, create = true, label = "Scheduler_Build_Process.create" ) { Build_Schedule.private_data.clean_build_schedules(db) } db.vacuum(Build_Schedule.private_data.tables.list) } def init_scheduler(timing_data: Timing_Data): Scheduler /* global resources with common close() operation */ private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options) private final lazy val _log_database: SQL.Database = try { val db = _log_store.open_database(server = this.server) _log_store.init_database(db) db } catch { case exn: Throwable => close(); throw exn } override def close(): Unit = { Option(_log_database).foreach(_.close()) super.close() } /* previous results via build log */ override def open_build_cluster(): Build_Cluster = { val build_cluster = super.open_build_cluster() build_cluster.init() Build_Benchmark.benchmark_requirements(build_options) if (build_context.worker) { val benchmark_options = build_options.string("build_hostname") = hostname Build_Benchmark.benchmark(benchmark_options, progress) } build_cluster.benchmark() } private val timing_data: Timing_Data = { val cluster_hosts: List[Build_Cluster.Host] = if (build_context.jobs == 0) build_context.build_hosts else { val local_build_host = Build_Cluster.Host( hostname, jobs = build_context.jobs, numa = build_context.numa_shuffling) local_build_host :: build_context.build_hosts } val host_infos = Host_Infos.load(cluster_hosts, _host_database) Timing_Data.load(host_infos, _log_database) } private val scheduler = init_scheduler(timing_data) def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = { val sessions = for { (session_name, result) <- state.toList if !result.current } yield { val info = build_context.sessions_structure(session_name) val entry = if (!results.cancelled(session_name)) { val status = if (result.ok) Build_Log.Session_Status.finished else Build_Log.Session_Status.failed Build_Log.Session_Entry( chapter = info.chapter, groups = info.groups, hostname = Some(result.node_info.hostname), threads = Some(timing_data.host_infos.num_threads(result.node_info)), timing = result.process_result.timing, sources = Some(result.output_shasum.digest.toString), status = Some(status)) } else Build_Log.Session_Entry( chapter = info.chapter, groups = info.groups, status = Some(Build_Log.Session_Status.cancelled)) session_name -> entry } val settings = Build_Log.Settings.all_settings.map(_.name).map(name => name -> Isabelle_System.getenv(name)) val props = List( Build_Log.Prop.build_id.name -> build_context.build_uuid, Build_Log.Prop.build_engine.name -> build_context.engine.name, Build_Log.Prop.build_host.name -> hostname, Build_Log.Prop.build_start.name -> Build_Log.print_date(start_date)) val meta_info = Build_Log.Meta_Info(props, settings) val build_info = Build_Log.Build_Info(sessions.toMap) val log_name = Build_Log.log_filename(engine = build_context.engine.name, date = start_date) Build_Log.private_data.update_sessions( _log_database, _log_store.cache.compress, log_name.file_name, build_info) Build_Log.private_data.update_meta_info(_log_database, log_name.file_name, meta_info) } /* build process */ def is_current(state: Build_Process.State, session_name: String): Boolean = state.ancestor_results(session_name) match { case Some(ancestor_results) if ancestor_results.forall(_.current) => val sources_shasum = state.sessions(session_name).sources_shasum val input_shasum = if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) val store_heap = build_context.build_heap || Sessions.is_pure(session_name) || state.sessions.iterator.exists(_.ancestors.contains(session_name)) store.check_output( _database_server, session_name, session_options = build_context.sessions_structure(session_name).options, sources_shasum = sources_shasum, input_shasum = input_shasum, fresh_build = build_context.fresh_build, store_heap = store_heap)._1 case _ => false } override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped) state.next_ready.map(_.name) else if (!_schedule.is_outdated(build_options, state)) _schedule.next(hostname, state) else { val current = state.next_ready.filter(task => is_current(state, task.name)) if (current.nonEmpty) current.map(_.name) else { val start = Time.now() val new_schedule = scheduler.schedule(state).update(state) val schedule = if (_schedule.is_empty) new_schedule else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering) val elapsed = Time.now() - start val timing_msg = if (elapsed.is_relevant) " (took " + elapsed.message + ")" else "" progress.echo_if(_schedule.deviation(schedule).minutes > 1, schedule.message + timing_msg) _schedule = schedule _schedule.next(hostname, state) } } override def run(): Build.Results = { for (db <- _build_database) Build_Process.private_data.transaction_lock(db, label = "Scheduler_Build_Process.init") { Build_Process.private_data.clean_build(db) } val results = super.run() write_build_log(results, snapshot().results) results } } /** SQL data model of build schedule, extending isabelle_build database */ object private_data extends SQL.Data("isabelle_build") { import Build_Process.private_data.{Base, Generic} + override lazy val tables: SQL.Tables = + SQL.Tables(Schedules.table, Nodes.table) + + lazy val all_tables: SQL.Tables = + SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list) + /* schedule */ object Schedules { val build_uuid = Generic.build_uuid.make_primary_key val generator = SQL.Column.string("generator") val start = SQL.Column.date("start") val serial = SQL.Column.long("serial") val table = make_table(List(build_uuid, generator, start, serial), name = "schedules") } def read_serial(db: SQL.Database, build_uuid: String = ""): Long = db.execute_query_statementO[Long]( Schedules.table.select(List(Schedules.serial.max), sql = SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))), _.long(Schedules.serial)).getOrElse(0L) - def read_scheduled_builds_domain(db: SQL.Database): List[String] = + def read_scheduled_builds_domain(db: SQL.Database): Map[String, Unit] = db.execute_query_statement( Schedules.table.select(List(Schedules.build_uuid)), - List.from[String], res => res.string(Schedules.build_uuid)) + Map.from[String, Unit], res => res.string(Schedules.build_uuid) -> ()) def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = { val schedules = db.execute_query_statement(Schedules.table.select(sql = SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))), List.from[Schedule], { res => val build_uuid = res.string(Schedules.build_uuid) val generator = res.string(Schedules.generator) val start = res.date(Schedules.start) val serial = res.long(Schedules.serial) Schedule(build_uuid, generator, start, Graph.empty, serial) }) for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield { val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid) schedule.copy(graph = Graph.make(nodes)) } } def write_schedule(db: SQL.Database, schedule: Schedule): Unit = { db.execute_statement( Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid))) db.execute_statement(Schedules.table.insert(), { stmt => stmt.string(1) = schedule.build_uuid stmt.string(2) = schedule.generator stmt.date(3) = schedule.start stmt.long(4) = schedule.serial }) update_nodes(db, schedule.build_uuid, schedule.graph.dest) } /* nodes */ object Nodes { val build_uuid = Generic.build_uuid.make_primary_key val name = Generic.name.make_primary_key val succs = SQL.Column.string("succs") val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start = SQL.Column.date("start") val duration = SQL.Column.long("duration") val table = make_table( List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration), name = "schedule_nodes") } type Nodes = List[((String, Schedule.Node), List[String])] def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = { db.execute_query_statement( Nodes.table.select(sql = SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))), List.from[((String, Schedule.Node), List[String])], { res => val name = res.string(Nodes.name) val succs = split_lines(res.string(Nodes.succs)) val hostname = res.string(Nodes.hostname) val numa_node = res.get_int(Nodes.numa_node) val rel_cpus = res.string(Nodes.rel_cpus) val start = res.date(Nodes.start) val duration = Time.ms(res.long(Nodes.duration)) val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus)) ((name, Schedule.Node(name, node_info, start, duration)), succs) } ) } def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = { db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid))) db.execute_batch_statement(Nodes.table.insert(), batch = for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) => stmt.string(1) = build_uuid stmt.string(2) = name stmt.string(3) = cat_lines(succs) stmt.string(4) = node.node_info.hostname stmt.int(5) = node.node_info.numa_node stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus) stmt.date(7) = node.start stmt.long(8) = node.duration.ms }) } def pull_schedule(db: SQL.Database, old_schedule: Schedule): Build_Schedule.Schedule = { val serial_db = read_serial(db) if (serial_db == old_schedule.serial) old_schedule else { read_schedules(db, old_schedule.build_uuid) match { case Nil => old_schedule case schedules => Library.the_single(schedules) } } } def update_schedule(db: SQL.Database, schedule: Schedule, old_schedule: Schedule): Schedule = { val changed = schedule.generator != old_schedule.generator || schedule.start != old_schedule.start || schedule.graph != old_schedule.graph val schedule1 = - if (changed) schedule.copy(serial = old_schedule.serial).inc_serial else schedule + if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule if (schedule1.serial != schedule.serial) write_schedule(db, schedule1) schedule1 } def remove_schedules(db: SQL.Database, remove: List[String]): Unit = if (remove.nonEmpty) { val sql = Generic.build_uuid.where_member(remove) db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql)))) } def clean_build_schedules(db: SQL.Database): Unit = { val running_builds_domain = db.execute_query_statement( Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)), - List.from[String], res => res.string(Base.build_uuid)) - - val (remove, _) = - Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain) + Map.from[String, Unit], res => res.string(Base.build_uuid) -> ()) - remove_schedules(db, remove) - } + val update = Library.Update.make(read_scheduled_builds_domain(db), running_builds_domain) - override val tables: SQL.Tables = SQL.Tables(Schedules.table, Nodes.table) - - val all_tables: SQL.Tables = - SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list) + remove_schedules(db, update.delete) + } } class Build_Engine extends Build.Engine("build_schedule") { override def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = super.build_options(options, build_cluster = build_cluster) if (build_cluster) options1 + "build_database_server" else options1 } def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = { val sessions_structure = context.sessions_structure val is_criticals = List( Path_Time_Heuristic.Absolute_Time(Time.minutes(5)), Path_Time_Heuristic.Absolute_Time(Time.minutes(10)), Path_Time_Heuristic.Absolute_Time(Time.minutes(20)), Path_Time_Heuristic.Relative_Time(0.5)) val parallel_threads = List( Path_Time_Heuristic.Fixed_Thread(1), Path_Time_Heuristic.Time_Based_Threads({ case time if time < Time.minutes(1) => 1 case time if time < Time.minutes(5) => 4 case _ => 8 })) val machine_splits = List( Path_Time_Heuristic.Critical_Nodes, Path_Time_Heuristic.Fixed_Fraction(0.3), Path_Time_Heuristic.Host_Speed(0.9)) val path_time_heuristics = for { is_critical <- is_criticals parallel <- parallel_threads machine_split <- machine_splits } yield Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options) val heuristics = default_heuristic :: path_time_heuristics Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid))) } override def open_build_process( context: Build.Context, progress: Progress, server: SSH.Server ): Build_Process = if (!context.master) new Scheduled_Build_Process(context, progress, server) else new Scheduler_Build_Process(context, progress, server) { def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context) } } object Build_Engine extends Build_Engine /* build schedule */ def build_schedule( options: Options, build_hosts: List[Build_Cluster.Host] = Nil, selection: Sessions.Selection = Sessions.Selection.empty, progress: Progress = new Progress, afp_root: Option[Path] = None, dirs: List[Path] = Nil, select_dirs: List[Path] = Nil, infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, augment_options: String => List[Options.Spec] = _ => Nil, session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Schedule = { val store = Build_Engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache) val log_store = Build_Log.store(options, cache = cache) val build_options = store.options def main( server: SSH.Server, database_server: Option[SQL.Database], log_database: PostgreSQL.Database, host_database: SQL.Database ): Schedule = { val full_sessions = Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs, select_dirs = select_dirs, infos = infos, augment_options = augment_options) val full_sessions_selection = full_sessions.imports_selection(selection) val build_deps = Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true).check_errors val build_context = Build.Context(store, build_deps, engine = Build_Engine, afp_root = afp_root, build_hosts = build_hosts, hostname = Build.hostname(build_options), numa_shuffling = numa_shuffling, session_setup = session_setup, master = true) val cluster_hosts = build_context.build_hosts val hosts_current = cluster_hosts.forall(host => isabelle.Host.read_info(host_database, host.name).isDefined) if (!hosts_current) { using(Build_Cluster.make(build_context, progress = progress).open())(_.init().benchmark()) } val host_infos = Host_Infos.load(cluster_hosts, host_database) val timing_data = Timing_Data.load(host_infos, log_database) val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress) - def task(session: Build_Job.Session_Context): Build_Process.Task = - Build_Process.Task(session.name, session.deps, build_context.build_uuid) val build_state = - Build_Process.State(sessions = sessions, pending = sessions.iterator.map(task).toList) + Build_Process.State(sessions = sessions, + pending = Map.from(sessions.iterator.map(Build_Process.Task.entry(_, build_context)))) val scheduler = Build_Engine.scheduler(timing_data, build_context) def schedule_msg(res: Exn.Result[Schedule]): String = res match { case Exn.Res(schedule) => schedule.message case _ => "" } progress.echo("Building schedule...") Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_)) } using(store.open_server()) { server => using_optional(store.maybe_open_database_server(server = server)) { database_server => using(log_store.open_database(server = server)) { log_database => using(store.open_build_database( path = isabelle.Host.private_data.database, server = server)) { host_database => main(server, database_server, log_database, host_database) } } } } } def write_schedule_graphic(schedule: Schedule, output: Path): Unit = { import java.awt.geom.{GeneralPath, Rectangle2D} import java.awt.{BasicStroke, Color, Graphics2D} val line_height = isabelle.graphview.Metrics.default.height val char_width = isabelle.graphview.Metrics.default.char_width val padding = isabelle.graphview.Metrics.default.space_width val gap = isabelle.graphview.Metrics.default.gap val graph = schedule.graph def text_width(text: String): Double = text.length * char_width val generator_height = line_height + padding val hostname_height = generator_height + line_height + padding def time_height(time: Time): Double = time.seconds def date_height(date: Date): Double = time_height(date - schedule.start) val hosts = graph.iterator.map(_._2._1).toList.groupBy(_.node_info.hostname) def node_width(node: Schedule.Node): Double = 2 * padding + text_width(node.job_name) case class Range(start: Double, stop: Double) { def proper: List[Range] = if (start < stop) List(this) else Nil def width: Double = stop - start } val rel_node_ranges = hosts.toList.flatMap { (hostname, nodes) => val sorted = nodes.sortBy(node => (node.start.time.ms, node.end.time.ms, node.job_name)) sorted.foldLeft((List.empty[Schedule.Node], Map.empty[Schedule.Node, Range])) { case ((nodes, allocated), node) => val width = node_width(node) + padding val parallel = nodes.filter(_.end.time > node.start.time) val (last, slots) = parallel.sortBy(allocated(_).start).foldLeft((0D, List.empty[Range])) { case ((start, ranges), node1) => val node_range = allocated(node1) (node_range.stop, ranges ::: Range(start, node_range.start).proper) } val start = (Range(last, Double.MaxValue) :: slots.filter(_.width >= width)).minBy(_.width).start (node :: parallel, allocated + (node -> Range(start, start + width))) }._2 }.toMap def host_width(hostname: String) = 2 * padding + (hosts(hostname).map(rel_node_ranges(_).stop).max max text_width(hostname)) def graph_height(graph: Graph[String, Schedule.Node]): Double = date_height(graph.maximals.map(graph.get_node(_).end).maxBy(_.unix_epoch)) val height = (hostname_height + 2 * padding + graph_height(graph)).ceil.toInt val (last, host_starts) = hosts.keys.foldLeft((0D, Map.empty[String, Double])) { case ((previous, starts), hostname) => (previous + gap + host_width(hostname), starts + (hostname -> previous)) } val width = (last - gap).ceil.toInt def node_start(node: Schedule.Node): Double = host_starts(node.node_info.hostname) + padding + rel_node_ranges(node).start def paint(gfx: Graphics2D): Unit = { gfx.setColor(Color.LIGHT_GRAY) gfx.fillRect(0, 0, width, height) gfx.setRenderingHints(isabelle.graphview.Metrics.rendering_hints) gfx.setFont(isabelle.graphview.Metrics.default.font) gfx.setStroke(new BasicStroke(1, BasicStroke.CAP_BUTT, BasicStroke.JOIN_ROUND)) draw_string(schedule.generator + ", build time: " + schedule.duration.message_hms, padding, 0) def draw_host(x: Double, hostname: String): Double = { val nodes = hosts(hostname).map(_.job_name).toSet val width = host_width(hostname) val height = 2 * padding + graph_height(graph.restrict(nodes.contains)) val padding1 = ((width - text_width(hostname)) / 2) max 0 val rect = new Rectangle2D.Double(x, hostname_height, width, height) gfx.setColor(Color.BLACK) gfx.draw(rect) gfx.setColor(Color.GRAY) gfx.fill(rect) draw_string(hostname, x + padding1, generator_height) x + gap + width } def draw_string(str: String, x: Double, y: Double): Unit = { gfx.setColor(Color.BLACK) gfx.drawString(str, x.toInt, (y + line_height).toInt) } def node_rect(node: Schedule.Node): Rectangle2D.Double = { val x = node_start(node) val y = hostname_height + padding + date_height(node.start) val width = node_width(node) val height = time_height(node.duration) new Rectangle2D.Double(x, y, width, height) } def draw_node(node: Schedule.Node): Rectangle2D.Double = { val rect = node_rect(node) gfx.setColor(Color.BLACK) gfx.draw(rect) gfx.setColor(Color.WHITE) gfx.fill(rect) def add_text(y: Double, text: String): Double = if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y else { val padding1 = padding min ((rect.height - (y + line_height)) / 2) draw_string(text, rect.x + padding, rect.y + y + padding1) y + padding1 + line_height } val node_info = node.node_info val duration_str = "(" + node.duration.message_hms + ")" val node_str = "on " + proper_string(node_info.toString.stripPrefix(node_info.hostname)).getOrElse("all") val start_str = "Start: " + (node.start - schedule.start).message_hms List(node.job_name, duration_str, node_str, start_str).foldLeft(0D)(add_text) rect } def draw_arrow(from: Schedule.Node, to: Rectangle2D.Double, curve: Double = 10): Unit = { val from_rect = node_rect(from) val path = new GeneralPath() path.moveTo(from_rect.getCenterX, from_rect.getMaxY) path.lineTo(to.getCenterX, to.getMinY) gfx.setColor(Color.BLUE) gfx.draw(path) } hosts.keys.foldLeft(0D)(draw_host) graph.topological_order.foreach { job_name => val node = graph.get_node(job_name) val rect = draw_node(node) for { pred <- graph.imm_preds(job_name).iterator pred_node = graph.get_node(pred) if node.node_info.hostname != pred_node.node_info.hostname } draw_arrow(pred_node, rect) } } val name = output.file_name if (File.is_png(name)) Graphics_File.write_png(output.file, paint, width, height) else if (File.is_pdf(name)) Graphics_File.write_pdf(output.file, paint, width, height) else error("Bad type of file: " + quote(name) + " (.png or .pdf expected)") } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("build_schedule", "generate build schedule", Scala_Project.here, { args => var afp_root: Option[Path] = None val base_sessions = new mutable.ListBuffer[String] val select_dirs = new mutable.ListBuffer[Path] val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] var numa_shuffling = false var output_file: Option[Path] = None var requirements = false val exclude_session_groups = new mutable.ListBuffer[String] var all_sessions = false val dirs = new mutable.ListBuffer[Path] val session_groups = new mutable.ListBuffer[String] var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS) var verbose = false val exclude_sessions = new mutable.ListBuffer[String] val getopts = Getopts(""" Usage: isabelle build_schedule [OPTIONS] [SESSIONS ...] Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -B NAME include session NAME and all descendants -D DIR include session directory and select its sessions -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -N cyclic shuffling of NUMA CPU nodes (performance tuning) -O FILE output file -R refer to requirements of selected sessions -X NAME exclude sessions from group NAME and all descendants -a select all sessions -d DIR include session directory -g NAME select session group NAME -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose -x NAME exclude session NAME and all descendants Generate build graph for scheduling. """, "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => base_sessions += arg), "D:" -> (arg => select_dirs += Path.explode(arg)), "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)), "N" -> (_ => numa_shuffling = true), "O:" -> (arg => output_file = Some(Path.explode(arg))), "R" -> (_ => requirements = true), "X:" -> (arg => exclude_session_groups += arg), "a" -> (_ => all_sessions = true), "d:" -> (arg => dirs += Path.explode(arg)), "g:" -> (arg => session_groups += arg), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true), "x:" -> (arg => exclude_sessions += arg)) val sessions = getopts(args) val progress = new Console_Progress(verbose = verbose) val schedule = build_schedule(options, selection = Sessions.Selection( requirements = requirements, all_sessions = all_sessions, base_sessions = base_sessions.toList, exclude_session_groups = exclude_session_groups.toList, exclude_sessions = exclude_sessions.toList, session_groups = session_groups.toList, sessions = sessions), progress = progress, afp_root = afp_root, dirs = dirs.toList, select_dirs = select_dirs.toList, numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling), build_hosts = build_hosts.toList) if (!schedule.is_empty && output_file.nonEmpty) write_schedule_graphic(schedule, output_file.get) }) } diff --git a/src/Pure/Build/export.scala b/src/Pure/Build/export.scala --- a/src/Pure/Build/export.scala +++ b/src/Pure/Build/export.scala @@ -1,680 +1,680 @@ /* Title: Pure/Build/export.scala Author: Makarius Manage per-session theory exports: compressed blobs. */ package isabelle import scala.annotation.tailrec import scala.util.matching.Regex import scala.collection.mutable object Export { /* artefact names */ val DOCUMENT_ID: String = "PIDE/document_id" val FILES: String = "PIDE/files" val MARKUP: String = "PIDE/markup" val MESSAGES: String = "PIDE/messages" val DOCUMENT_PREFIX: String = "document/" val DOCUMENT_LATEX: String = DOCUMENT_PREFIX + "latex" val THEORY_PREFIX: String = "theory/" val PROOFS_PREFIX: String = "proofs/" def explode_name(s: String): List[String] = space_explode('/', s) def implode_name(elems: Iterable[String]): String = elems.mkString("/") /* SQL data model */ object private_data extends SQL.Data() { - override lazy val tables = SQL.Tables(Base.table) + override lazy val tables: SQL.Tables = SQL.Tables(Base.table) object Base { val session_name = SQL.Column.string("session_name").make_primary_key val theory_name = SQL.Column.string("theory_name").make_primary_key val name = SQL.Column.string("name").make_primary_key val executable = SQL.Column.bool("executable") val compressed = SQL.Column.bool("compressed") val body = SQL.Column.bytes("body") val table = SQL.Table("isabelle_exports", List(session_name, theory_name, name, executable, compressed, body)) } def where_equal(session_name: String, theory_name: String = "", name: String = ""): SQL.Source = SQL.where_and( Base.session_name.equal(session_name), if_proper(theory_name, Base.theory_name.equal(theory_name)), if_proper(name, Base.name.equal(name))) def clean_session(db: SQL.Database, session_name: String): Unit = db.execute_statement(Base.table.delete(sql = where_equal(session_name))) def known_entries(db: SQL.Database, entry_names: Iterable[Entry_Name]): Set[Entry_Name] = { val it = entry_names.iterator if (it.isEmpty) Set.empty[Entry_Name] else { val sql_preds = List.from( for (entry_name <- it) yield { SQL.and( Base.session_name.equal(entry_name.session), Base.theory_name.equal(entry_name.theory), Base.name.equal(entry_name.name) ) }) db.execute_query_statement( Base.table.select(List(Base.session_name, Base.theory_name, Base.name), sql = SQL.where(SQL.OR(sql_preds))), Set.from[Entry_Name], { res => val session_name = res.string(Base.session_name) val theory_name = res.string(Base.theory_name) val name = res.string(Base.name) Entry_Name(session_name, theory_name, name) }) } } def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] = db.execute_query_statementO[Entry]( Base.table.select(List(Base.executable, Base.compressed, Base.body), sql = private_data.where_equal(entry_name.session, entry_name.theory, entry_name.name)), { res => val executable = res.bool(Base.executable) val compressed = res.bool(Base.compressed) val bytes = res.bytes(Base.body) val body = Future.value(compressed, bytes) Entry(entry_name, executable, body, cache) } ) def write_entries(db: SQL.Database, entries: List[Entry]): Unit = db.execute_batch_statement(Base.table.insert(), batch = for (entry <- entries) yield { (stmt: SQL.Statement) => val (compressed, bs) = entry.body.join stmt.string(1) = entry.session_name stmt.string(2) = entry.theory_name stmt.string(3) = entry.name stmt.bool(4) = entry.executable stmt.bool(5) = compressed stmt.bytes(6) = bs }) def read_theory_names(db: SQL.Database, session_name: String): List[String] = db.execute_query_statement( Base.table.select(List(Base.theory_name), distinct = true, sql = private_data.where_equal(session_name) + SQL.order_by(List(Base.theory_name))), List.from[String], res => res.string(Base.theory_name)) def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] = db.execute_query_statement( Base.table.select(List(Base.theory_name, Base.name), sql = private_data.where_equal(session_name)) + SQL.order_by(List(Base.theory_name, Base.name)), List.from[Entry_Name], { res => Entry_Name( session = session_name, theory = res.string(Base.theory_name), name = res.string(Base.name)) }) } def compound_name(a: String, b: String): String = if (a.isEmpty) b else a + ":" + b sealed case class Entry_Name(session: String = "", theory: String = "", name: String = "") { val compound_name: String = Export.compound_name(theory, name) def make_path(prune: Int = 0): Path = { val elems = theory :: space_explode('/', name) if (elems.length < prune + 1) { error("Cannot prune path by " + prune + " element(s): " + Path.make(elems)) } else Path.make(elems.drop(prune)) } } def message(msg: String, theory_name: String, name: String): String = msg + " " + quote(name) + " for theory " + quote(theory_name) object Entry { def apply( entry_name: Entry_Name, executable: Boolean, body: Future[(Boolean, Bytes)], cache: XML.Cache ): Entry = new Entry(entry_name, executable, body, cache) def empty(theory_name: String, name: String): Entry = Entry(Entry_Name(theory = theory_name, name = name), false, Future.value(false, Bytes.empty), XML.Cache.none) def make( session_name: String, args: Protocol.Export.Args, bytes: Bytes, cache: XML.Cache ): Entry = { val body = if (args.compress) Future.fork(bytes.maybe_compress(cache = cache.compress)) else Future.value((false, bytes)) val entry_name = Entry_Name(session = session_name, theory = args.theory_name, name = args.name) Entry(entry_name, args.executable, body, cache) } } final class Entry private( val entry_name: Entry_Name, val executable: Boolean, val body: Future[(Boolean, Bytes)], val cache: XML.Cache ) { def session_name: String = entry_name.session def theory_name: String = entry_name.theory def name: String = entry_name.name override def toString: String = name def is_finished: Boolean = body.is_finished def cancel(): Unit = body.cancel() def compound_name: String = entry_name.compound_name def name_has_prefix(s: String): Boolean = name.startsWith(s) val name_elems: List[String] = explode_name(name) def name_extends(elems: List[String]): Boolean = name_elems.startsWith(elems) && name_elems != elems def bytes: Bytes = { val (compressed, bs) = body.join if (compressed) bs.uncompress(cache = cache.compress) else bs } def text: String = bytes.text def yxml: XML.Body = YXML.parse_body(UTF8.decode_permissive(bytes), cache = cache) } def make_regex(pattern: String): Regex = { @tailrec def make(result: List[String], depth: Int, chs: List[Char]): Regex = chs match { case '*' :: '*' :: rest => make("[^:]*" :: result, depth, rest) case '*' :: rest => make("[^:/]*" :: result, depth, rest) case '?' :: rest => make("[^:/]" :: result, depth, rest) case '\\' :: c :: rest => make(("\\" + c) :: result, depth, rest) case '{' :: rest => make("(" :: result, depth + 1, rest) case ',' :: rest if depth > 0 => make("|" :: result, depth, rest) case '}' :: rest if depth > 0 => make(")" :: result, depth - 1, rest) case c :: rest if ".+()".contains(c) => make(("\\" + c) :: result, depth, rest) case c :: rest => make(c.toString :: result, depth, rest) case Nil => result.reverse.mkString.r } make(Nil, 0, pattern.toList) } def make_matcher(pats: List[String]): Entry_Name => Boolean = { val regs = pats.map(make_regex) (entry_name: Entry_Name) => regs.exists(_.matches(entry_name.compound_name)) } def clean_session(db: SQL.Database, session_name: String): Unit = private_data.transaction_lock(db, create = true, label = "Export.clean_session") { private_data.clean_session(db, session_name) } def read_theory_names(db: SQL.Database, session_name: String): List[String] = private_data.transaction_lock(db, label = "Export.read_theory_names") { private_data.read_theory_names(db, session_name) } def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] = private_data.transaction_lock(db, label = "Export.read_entry_names") { private_data.read_entry_names(db, session_name) } def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] = private_data.transaction_lock(db, label = "Export.read_entry") { private_data.read_entry(db, entry_name, cache) } /* database consumer thread */ def consumer(db: SQL.Database, cache: XML.Cache, progress: Progress = new Progress): Consumer = new Consumer(db, cache, progress) class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) { private val errors = Synchronized[List[String]](Nil) private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = { for ((entry, _) <- args) { if (progress.stopped) entry.cancel() else entry.body.join } private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) val buffer = new mutable.ListBuffer[Option[Entry]] for ((entry, strict) <- args) { if (progress.stopped || known(entry.entry_name)) { buffer += None if (strict && known(entry.entry_name)) { val msg = message("Duplicate export", entry.theory_name, entry.name) errors.change(msg :: _) } } else { buffer += Some(entry) known += entry.entry_name } } val entries = buffer.toList try { private_data.write_entries(db, entries.flatten) val ok = Exn.Res[Unit](()) entries.map(_ => ok) } catch { case exn: Throwable => val err = Exn.Exn[Unit](exn) entries.map(_ => err) } } } private val consumer = Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")( bulk = { case (entry, _) => entry.is_finished }, consume = args => (args.grouped(20).toList.flatMap(consume), true)) def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = { if (!progress.stopped && !body.is_empty) { consumer.send(Entry.make(session_name, args, body, cache) -> args.strict) } } def shutdown(close: Boolean = false): List[String] = { consumer.shutdown() if (close) db.close() errors.value.reverse ::: (if (progress.stopped) List("Export stopped") else Nil) } } /* context for database access */ def open_database_context(store: Store, server: SSH.Server = SSH.no_server): Database_Context = new Database_Context(store, store.maybe_open_database_server(server = server)) def open_session_context0( store: Store, session: String, server: SSH.Server = SSH.no_server ): Session_Context = { open_database_context(store, server = server) .open_session0(session, close_database_context = true) } def open_session_context( store: Store, session_background: Sessions.Background, document_snapshot: Option[Document.Snapshot] = None, server: SSH.Server = SSH.no_server ): Session_Context = { open_database_context(store, server = server).open_session( session_background, document_snapshot = document_snapshot, close_database_context = true) } class Database_Context private[Export]( val store: Store, val database_server: Option[SQL.Database] ) extends AutoCloseable { database_context => override def toString: String = { val s = database_server match { case Some(db) => db.toString case None => "input_dirs = " + store.input_dirs.map(_.absolute).mkString(", ") } "Database_Context(" + s + ")" } def cache: Term.Cache = store.cache def close(): Unit = database_server.foreach(_.close()) def open_database(session: String, output: Boolean = false): Session_Database = database_server match { case Some(db) => new Session_Database(session, db) case None => new Session_Database(session, store.open_database(session, output = output)) { override def close(): Unit = db.close() } } def open_session0(session: String, close_database_context: Boolean = false): Session_Context = open_session(Sessions.background0(session), close_database_context = close_database_context) def open_session( session_background: Sessions.Background, document_snapshot: Option[Document.Snapshot] = None, close_database_context: Boolean = false ): Session_Context = { val session_name = session_background.check_errors.session_name val session_hierarchy = session_background.sessions_structure.build_hierarchy(session_name) val session_databases = database_server match { case Some(db) => session_hierarchy.map(name => new Session_Database(name, db)) case None => val attempts = for (name <- session_hierarchy) yield name -> store.try_open_database(name, server_mode = false) attempts.collectFirst({ case (name, None) => name }) match { case Some(bad) => for (case (_, Some(db)) <- attempts) db.close() store.error_database(bad) case None => for (case (name, Some(db)) <- attempts) yield { new Session_Database(name, db) { override def close(): Unit = this.db.close() } } } } new Session_Context( database_context, session_background, session_databases, document_snapshot) { override def close(): Unit = { session_databases.foreach(_.close()) if (close_database_context) database_context.close() } } } } class Session_Database private[Export](val session: String, val db: SQL.Database) extends AutoCloseable { def close(): Unit = () lazy private [Export] val theory_names: List[String] = read_theory_names(db, session) lazy private [Export] val entry_names: List[Entry_Name] = read_entry_names(db, session) } class Session_Context private[Export]( val database_context: Database_Context, session_background: Sessions.Background, db_hierarchy: List[Session_Database], val document_snapshot: Option[Document.Snapshot] ) extends AutoCloseable { session_context => def close(): Unit = () def cache: Term.Cache = database_context.cache def sessions_structure: Sessions.Structure = session_background.sessions_structure def session_base: Sessions.Base = session_background.base def session_name: String = if (document_snapshot.isDefined) Sessions.DRAFT else session_base.session_name def session_database(session: String = session_name): Option[Session_Database] = db_hierarchy.find(_.session == session) def session_db(session: String = session_name): Option[SQL.Database] = session_database(session = session).map(_.db) def session_stack: List[String] = ((if (document_snapshot.isDefined) List(session_name) else Nil) ::: db_hierarchy.map(_.session)).reverse private def select[A]( session: String, select: Session_Database => List[A], project: Entry_Name => A, sort_key: A => String ): List[A] = { def result(name: String): List[A] = if (name == Sessions.DRAFT) { (for { snapshot <- document_snapshot.iterator entry_name <- snapshot.all_exports.keysIterator } yield project(entry_name)).toSet.toList.sortBy(sort_key) } else session_database(name).map(select).getOrElse(Nil) if (session.nonEmpty) result(session) else session_stack.flatMap(result) } def entry_names(session: String = session_name): List[Entry_Name] = select(session, _.entry_names, identity, _.compound_name) def theory_names(session: String = session_name): List[String] = select(session, _.theory_names, _.theory, identity) def get(theory: String, name: String): Option[Entry] = { def snapshot_entry: Option[Entry] = for { snapshot <- document_snapshot entry_name = Entry_Name(session = Sessions.DRAFT, theory = theory, name = name) entry <- snapshot.all_exports.get(entry_name) } yield entry def db_entry: Option[Entry] = db_hierarchy.view.map { database => val entry_name = Export.Entry_Name(session = database.session, theory = theory, name = name) read_entry(database.db, entry_name, cache) }.collectFirst({ case Some(entry) => entry }) snapshot_entry orElse db_entry } def apply(theory: String, name: String, permissive: Boolean = false): Entry = get(theory, name) match { case None if permissive => Entry.empty(theory, name) case None => error("Missing export entry " + quote(compound_name(theory, name))) case Some(entry) => entry } def theory(theory: String, other_cache: Option[Term.Cache] = None): Theory_Context = new Theory_Context(session_context, theory, other_cache) def get_source_file(name: String): Option[Store.Source_File] = { val store = database_context.store (for { database <- db_hierarchy.iterator file <- store.read_sources(database.db, database.session, name = name).iterator } yield file).nextOption() } def source_file(name: String): Store.Source_File = get_source_file(name).getOrElse(error("Missing session source file " + quote(name))) def theory_source(theory: String, unicode_symbols: Boolean = false): String = { def snapshot_source: Option[String] = for { snapshot <- document_snapshot text <- snapshot.version.nodes.iterator.collectFirst( { case (name, node) if name.theory == theory => node.source }) if text.nonEmpty } yield Symbol.output(unicode_symbols, text) def db_source: Option[String] = { val theory_context = session_context.theory(theory) for { name <- theory_context.files0(permissive = true).headOption file <- get_source_file(name) } yield Symbol.output(unicode_symbols, file.bytes.text) } snapshot_source orElse db_source getOrElse "" } def classpath(): List[File.Content] = { (for { session <- session_stack.iterator info <- sessions_structure.get(session).iterator if info.export_classpath.nonEmpty matcher = make_matcher(info.export_classpath) entry_name <- entry_names(session = session).iterator if matcher(entry_name) entry <- get(entry_name.theory, entry_name.name).iterator } yield File.content(entry.entry_name.make_path(), entry.bytes)).toList } override def toString: String = "Export.Session_Context(" + commas_quote(session_stack) + ")" } class Theory_Context private[Export]( val session_context: Session_Context, val theory: String, other_cache: Option[Term.Cache] ) { def cache: Term.Cache = other_cache getOrElse session_context.cache def get(name: String): Option[Entry] = session_context.get(theory, name) def apply(name: String, permissive: Boolean = false): Entry = session_context.apply(theory, name, permissive = permissive) def yxml(name: String): XML.Body = get(name) match { case Some(entry) => entry.yxml case None => Nil } def document_id(): Option[Long] = apply(DOCUMENT_ID, permissive = true).text match { case Value.Long(id) => Some(id) case _ => None } def files0(permissive: Boolean = false): List[String] = split_lines(apply(FILES, permissive = permissive).text) def files(permissive: Boolean = false): Option[(String, List[String])] = files0(permissive = permissive) match { case Nil => None case a :: bs => Some((a, bs)) } override def toString: String = "Export.Theory_Context(" + quote(theory) + ")" } /* export to file-system */ def export_files( store: Store, session_name: String, export_dir: Path, progress: Progress = new Progress, export_prune: Int = 0, export_list: Boolean = false, export_patterns: List[String] = Nil ): Unit = { using(store.open_database(session_name)) { db => val entry_names = read_entry_names(db, session_name) // list if (export_list) { for (entry_name <- entry_names) progress.echo(entry_name.compound_name) } // export if (export_patterns.nonEmpty) { val matcher = make_matcher(export_patterns) for { entry_name <- entry_names if matcher(entry_name) entry <- read_entry(db, entry_name, store.cache) } { val path = export_dir + entry_name.make_path(prune = export_prune) progress.echo("export " + path + (if (entry.executable) " (executable)" else "")) Isabelle_System.make_directory(path.dir) val bytes = entry.bytes if (!path.is_file || Bytes.read(path) != bytes) Bytes.write(path, bytes) File.set_executable(path, reset = !entry.executable) } } } } /* Isabelle tool wrapper */ val default_export_dir: Path = Path.explode("export") val isabelle_tool = Isabelle_Tool("export", "retrieve theory exports", Scala_Project.here, { args => /* arguments */ var export_dir = default_export_dir var dirs: List[Path] = Nil var export_list = false var no_build = false var options = Options.init() var export_prune = 0 var export_patterns: List[String] = Nil val getopts = Getopts(""" Usage: isabelle export [OPTIONS] SESSION Options are: -O DIR output directory for exported files (default: """ + default_export_dir + """) -d DIR include session directory -l list exports -n no build of session -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -p NUM prune path of exported files by NUM elements -x PATTERN extract files matching pattern (e.g. "*:**" for all) List or export theory exports for SESSION: named blobs produced by isabelle build. Option -l or -x is required; option -x may be repeated. The PATTERN language resembles glob patterns in the shell, with ? and * (both excluding ":" and "/"), ** (excluding ":"), and [abc] or [^abc], and variants {pattern1,pattern2,pattern3}. """, "O:" -> (arg => export_dir = Path.explode(arg)), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), "l" -> (_ => export_list = true), "n" -> (_ => no_build = true), "o:" -> (arg => options = options + arg), "p:" -> (arg => export_prune = Value.Int.parse(arg)), "x:" -> (arg => export_patterns ::= arg)) val more_args = getopts(args) val session_name = more_args match { case List(session_name) if export_list || export_patterns.nonEmpty => session_name case _ => getopts.usage() } val progress = new Console_Progress() /* build */ if (!no_build) { val rc = progress.interrupt_handler { Build.build_logic(options, session_name, progress = progress, dirs = dirs) } if (rc != Process_Result.RC.ok) sys.exit(rc) } /* export files */ export_files(Store(options), session_name, export_dir, progress = progress, export_prune = export_prune, export_list = export_list, export_patterns = export_patterns) }) } diff --git a/src/Pure/Build/store.scala b/src/Pure/Build/store.scala --- a/src/Pure/Build/store.scala +++ b/src/Pure/Build/store.scala @@ -1,580 +1,580 @@ /* Title: Pure/Build/store.scala Author: Makarius Persistent store for session content: within file-system and/or SQL database. */ package isabelle import java.sql.SQLException object Store { def apply( options: Options, build_cluster: Boolean = false, cache: Term.Cache = Term.Cache.make() ): Store = new Store(options, build_cluster, cache) /* file names */ def heap(name: String): Path = Path.basic(name) def log(name: String): Path = Path.basic("log") + Path.basic(name) def log_db(name: String): Path = log(name).db def log_gz(name: String): Path = log(name).gz /* session */ final class Session private[Store]( val name: String, val heap: Option[Path], val log_db: Option[Path], dirs: List[Path] ) { def log_db_name: String = Store.log_db(name).implode def defined: Boolean = heap.isDefined || log_db.isDefined def the_heap: Path = heap getOrElse error("Missing heap image for session " + quote(name) + " -- expected in:\n" + cat_lines(dirs.map(dir => " " + File.standard_path(dir)))) def heap_digest(): Option[SHA1.Digest] = heap.flatMap(ML_Heap.read_file_digest) override def toString: String = name } /* session build info */ sealed case class Build_Info( sources: SHA1.Shasum, input_heaps: SHA1.Shasum, output_heap: SHA1.Shasum, return_code: Int, uuid: String ) { def ok: Boolean = return_code == 0 } /* session sources */ sealed case class Source_File( name: String, digest: SHA1.Digest, compressed: Boolean, body: Bytes, cache: Compress.Cache ) { override def toString: String = name def bytes: Bytes = if (compressed) body.uncompress(cache = cache) else body } object Sources { def load(session_base: Sessions.Base, cache: Compress.Cache = Compress.Cache.none): Sources = new Sources( session_base.session_sources.foldLeft(Map.empty) { case (sources, (path, digest)) => def err(): Nothing = error("Incoherent digest for source file: " + path) val name = File.symbolic_path(path) sources.get(name) match { case Some(source_file) => if (source_file.digest == digest) sources else err() case None => val bytes = Bytes.read(path) if (bytes.sha1_digest == digest) { val (compressed, body) = bytes.maybe_compress(Compress.Options_Zstd(), cache = cache) val file = Source_File(name, digest, compressed, body, cache) sources + (name -> file) } else err() } }) } class Sources private(rep: Map[String, Source_File]) extends Iterable[Source_File] { override def toString: String = rep.values.toList.sortBy(_.name).mkString("Sources(", ", ", ")") override def iterator: Iterator[Source_File] = rep.valuesIterator def get(name: String): Option[Source_File] = rep.get(name) def apply(name: String): Source_File = get(name).getOrElse(error("Missing session sources entry " + quote(name))) } /* SQL data model */ object private_data extends SQL.Data() { - override lazy val tables = SQL.Tables(Session_Info.table, Sources.table) + override lazy val tables: SQL.Tables = SQL.Tables(Session_Info.table, Sources.table) object Session_Info { val session_name = SQL.Column.string("session_name").make_primary_key // Build_Log.Session_Info val session_timing = SQL.Column.bytes("session_timing") val command_timings = SQL.Column.bytes("command_timings") val theory_timings = SQL.Column.bytes("theory_timings") val ml_statistics = SQL.Column.bytes("ml_statistics") val task_statistics = SQL.Column.bytes("task_statistics") val errors = SQL.Column.bytes("errors") val build_log_columns = List(session_name, session_timing, command_timings, theory_timings, ml_statistics, task_statistics, errors) // Build_Info val sources = SQL.Column.string("sources") val input_heaps = SQL.Column.string("input_heaps") val output_heap = SQL.Column.string("output_heap") val return_code = SQL.Column.int("return_code") val uuid = SQL.Column.string("uuid") val build_columns = List(sources, input_heaps, output_heap, return_code, uuid) val table = SQL.Table("isabelle_session_info", build_log_columns ::: build_columns) } object Sources { val session_name = SQL.Column.string("session_name").make_primary_key val name = SQL.Column.string("name").make_primary_key val digest = SQL.Column.string("digest") val compressed = SQL.Column.bool("compressed") val body = SQL.Column.bytes("body") val table = SQL.Table("isabelle_sources", List(session_name, name, digest, compressed, body)) def where_equal(session_name: String, name: String = ""): SQL.Source = SQL.where_and( Sources.session_name.equal(session_name), if_proper(name, Sources.name.equal(name))) } def read_bytes(db: SQL.Database, name: String, column: SQL.Column): Bytes = db.execute_query_statementO[Bytes]( Session_Info.table.select(List(column), sql = Session_Info.session_name.where_equal(name)), res => res.bytes(column) ).getOrElse(Bytes.empty) def read_properties( db: SQL.Database, name: String, column: SQL.Column, cache: Term.Cache ): List[Properties.T] = Properties.uncompress(read_bytes(db, name, column), cache = cache) def read_session_timing(db: SQL.Database, name: String, cache: Term.Cache): Properties.T = Properties.decode(read_bytes(db, name, Session_Info.session_timing), cache = cache) def read_command_timings(db: SQL.Database, name: String): Bytes = read_bytes(db, name, Session_Info.command_timings) def read_theory_timings(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.theory_timings, cache) def read_ml_statistics(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.ml_statistics, cache) def read_task_statistics(db: SQL.Database, name: String, cache: Term.Cache): List[Properties.T] = read_properties(db, name, Session_Info.task_statistics, cache) def read_errors(db: SQL.Database, name: String, cache: Term.Cache): List[String] = Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache) def read_build(db: SQL.Database, name: String): Option[Store.Build_Info] = db.execute_query_statementO[Store.Build_Info]( Session_Info.table.select(sql = Session_Info.session_name.where_equal(name)), { res => val uuid = try { Option(res.string(Session_Info.uuid)).getOrElse("") } catch { case _: SQLException => "" } Store.Build_Info( SHA1.fake_shasum(res.string(Session_Info.sources)), SHA1.fake_shasum(res.string(Session_Info.input_heaps)), SHA1.fake_shasum(res.string(Session_Info.output_heap)), res.int(Session_Info.return_code), uuid) }) def read_build_uuid(db: SQL.Database, name: String): String = db.execute_query_statementO[String]( Session_Info.table.select(List(Session_Info.uuid), sql = Session_Info.session_name.where_equal(name)), { res => try { Option(res.string(Session_Info.uuid)).getOrElse("") } catch { case _: SQLException => "" } }).getOrElse("") def write_session_info( db: SQL.Database, cache: Compress.Cache, session_name: String, build_log: Build_Log.Session_Info, build: Build_Info ): Unit = { db.execute_statement(Session_Info.table.insert(), body = { stmt => stmt.string(1) = session_name stmt.bytes(2) = Properties.encode(build_log.session_timing) stmt.bytes(3) = Properties.compress(build_log.command_timings, cache = cache) stmt.bytes(4) = Properties.compress(build_log.theory_timings, cache = cache) stmt.bytes(5) = Properties.compress(build_log.ml_statistics, cache = cache) stmt.bytes(6) = Properties.compress(build_log.task_statistics, cache = cache) stmt.bytes(7) = Build_Log.compress_errors(build_log.errors, cache = cache) stmt.string(8) = build.sources.toString stmt.string(9) = build.input_heaps.toString stmt.string(10) = build.output_heap.toString stmt.int(11) = build.return_code stmt.string(12) = build.uuid }) } def write_sources( db: SQL.Database, session_name: String, source_files: Iterable[Source_File] ): Unit = { db.execute_batch_statement(Sources.table.insert(), batch = for (source_file <- source_files) yield { (stmt: SQL.Statement) => stmt.string(1) = session_name stmt.string(2) = source_file.name stmt.string(3) = source_file.digest.toString stmt.bool(4) = source_file.compressed stmt.bytes(5) = source_file.body }) } def read_sources( db: SQL.Database, session_name: String, name: String, cache: Compress.Cache ): List[Source_File] = { db.execute_query_statement( Sources.table.select( sql = Sources.where_equal(session_name, name = name) + SQL.order_by(List(Sources.name))), List.from[Source_File], { res => val res_name = res.string(Sources.name) val digest = SHA1.fake_digest(res.string(Sources.digest)) val compressed = res.bool(Sources.compressed) val body = res.bytes(Sources.body) Source_File(res_name, digest, compressed, body, cache) } ) } } def read_build_uuid(path: Path, session: String): String = try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) } catch { case _: SQLException => "" } } class Store private( val options: Options, val build_cluster: Boolean, val cache: Term.Cache ) { store => override def toString: String = "Store(output_dir = " + output_dir.absolute + ")" /* directories */ val system_output_dir: Path = Path.explode("$ISABELLE_HEAPS_SYSTEM/$ML_IDENTIFIER") val user_output_dir: Path = Path.explode("$ISABELLE_HEAPS/$ML_IDENTIFIER") def system_heaps: Boolean = options.bool("system_heaps") val output_dir: Path = if (system_heaps) system_output_dir else user_output_dir val input_dirs: List[Path] = if (system_heaps) List(system_output_dir) else List(user_output_dir, system_output_dir) val clean_dirs: List[Path] = if (system_heaps) List(user_output_dir, system_output_dir) else List(user_output_dir) def presentation_dir: Path = if (system_heaps) Path.explode("$ISABELLE_BROWSER_INFO_SYSTEM") else Path.explode("$ISABELLE_BROWSER_INFO") /* file names */ def output_heap(name: String): Path = output_dir + Store.heap(name) def output_log(name: String): Path = output_dir + Store.log(name) def output_log_db(name: String): Path = output_dir + Store.log_db(name) def output_log_gz(name: String): Path = output_dir + Store.log_gz(name) /* session */ def get_session(name: String): Store.Session = { val heap = input_dirs.view.map(_ + Store.heap(name)).find(_.is_file) val log_db = input_dirs.view.map(_ + Store.log_db(name)).find(_.is_file) new Store.Session(name, heap, log_db, input_dirs) } def output_session(name: String, store_heap: Boolean = false): Store.Session = { val heap = if (store_heap) Some(output_heap(name)) else None val log_db = if (!build_database_server) Some(output_log_db(name)) else None new Store.Session(name, heap, log_db, List(output_dir)) } /* heap */ def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = { def get_database: Option[SHA1.Digest] = { for { db <- database_server digest <- ML_Heap.read_digests(db, List(name)).valuesIterator.nextOption() } yield digest } get_database orElse get_session(name).heap_digest() match { case Some(digest) => SHA1.shasum(digest, name) case None => SHA1.no_shasum } } /* databases for build process and session content */ def build_database_server: Boolean = options.bool("build_database_server") def build_database: Boolean = options.bool("build_database") def open_server(): SSH.Server = PostgreSQL.open_server(options, host = options.string("build_database_host"), port = options.int("build_database_port"), ssh_host = options.string("build_database_ssh_host"), ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) def open_database_server(server: SSH.Server = SSH.no_server): PostgreSQL.Database = PostgreSQL.open_database_server(options, server = server, user = options.string("build_database_user"), password = options.string("build_database_password"), database = options.string("build_database_name"), host = options.string("build_database_host"), port = options.int("build_database_port"), ssh_host = options.string("build_database_ssh_host"), ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) def maybe_open_database_server( server: SSH.Server = SSH.no_server, guard: Boolean = build_database_server ): Option[SQL.Database] = { if (guard) Some(open_database_server(server = server)) else None } def maybe_open_heaps_database( database_server: Option[SQL.Database], server: SSH.Server = SSH.no_server ): Option[SQL.Database] = { if (database_server.isDefined) None else store.maybe_open_database_server(server = server, guard = build_cluster) } def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database = if (build_database_server || build_cluster) open_database_server(server = server) else SQLite.open_database(path, restrict = true) def maybe_open_build_database( path: Path = Path.explode("$ISABELLE_HOME_USER/build.db"), server: SSH.Server = SSH.no_server ): Option[SQL.Database] = { if (build_database) Some(open_build_database(path, server = server)) else None } def try_open_database( name: String, output: Boolean = false, server: SSH.Server = SSH.no_server, server_mode: Boolean = build_database_server ): Option[SQL.Database] = { def check(db: SQL.Database): Option[SQL.Database] = if (output || session_info_exists(db)) Some(db) else { db.close(); None } if (server_mode) check(open_database_server(server = server)) else if (output) Some(SQLite.open_database(output_log_db(name))) else { (for { dir <- input_dirs.view path = dir + Store.log_db(name) if path.is_file db <- check(SQLite.open_database(path)) } yield db).headOption } } def error_database(name: String): Nothing = error("Missing build database for session " + quote(name)) def open_database( name: String, output: Boolean = false, server: SSH.Server = SSH.no_server ): SQL.Database = { try_open_database(name, output = output, server = server) getOrElse error_database(name) } def clean_output( database_server: Option[SQL.Database], name: String, session_init: Boolean = false, progress: Progress = new Progress ): Unit = { val relevant_db = database_server match { case Some(db) => ML_Heap.clean_entry(db, name) clean_session_info(db, name) case None => false } val del = for { dir <- clean_dirs file <- List(Store.heap(name), Store.log_db(name), Store.log(name), Store.log_gz(name)) path = dir + file if path.is_file } yield path.file.delete if (database_server.isEmpty && session_init) { using(open_database(name, output = true))(clean_session_info(_, name)) } if (relevant_db || del.nonEmpty) { if (del.forall(identity)) progress.echo("Cleaned " + name) else progress.echo(name + " FAILED to clean") } } def check_output( database_server: Option[SQL.Database], name: String, session_options: Options, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, fresh_build: Boolean, store_heap: Boolean ): (Boolean, SHA1.Shasum) = { def no_check: (Boolean, SHA1.Shasum) = (false, SHA1.no_shasum) def check(db: SQL.Database): (Boolean, SHA1.Shasum) = read_build(db, name) match { case Some(build) => val output_shasum = heap_shasum(if (db.is_postgresql) Some(db) else None, name) val current = !fresh_build && build.ok && Sessions.eq_sources(session_options, build.sources, sources_shasum) && build.input_heaps == input_shasum && build.output_heap == output_shasum && !(store_heap && output_shasum.is_empty) (current, output_shasum) case None => no_check } database_server match { case Some(db) => if (session_info_exists(db)) check(db) else no_check case None => using_option(try_open_database(name))(check) getOrElse no_check } } /* session info */ def session_info_exists(db: SQL.Database): Boolean = Store.private_data.tables.forall(db.exists_table) def session_info_defined(db: SQL.Database, name: String): Boolean = db.execute_query_statementB( Store.private_data.Session_Info.table.select(List(Store.private_data.Session_Info.session_name), sql = Store.private_data.Session_Info.session_name.where_equal(name))) def clean_session_info(db: SQL.Database, name: String): Boolean = { Export.clean_session(db, name) Document_Build.clean_session(db, name) Store.private_data.transaction_lock(db, create = true, label = "Store.clean_session_info") { val already_defined = session_info_defined(db, name) db.execute_statement( SQL.multi( Store.private_data.Session_Info.table.delete( sql = Store.private_data.Session_Info.session_name.where_equal(name)), Store.private_data.Sources.table.delete( sql = Store.private_data.Sources.where_equal(name)))) already_defined } } def write_session_info( db: SQL.Database, session_name: String, sources: Store.Sources, build_log: Build_Log.Session_Info, build: Store.Build_Info ): Unit = { Store.private_data.transaction_lock(db, label = "Store.write_session_info") { for (source_files <- sources.iterator.toList.grouped(200)) { Store.private_data.write_sources(db, session_name, source_files) } Store.private_data.write_session_info(db, cache.compress, session_name, build_log, build) } } def read_session_timing(db: SQL.Database, session: String): Properties.T = Store.private_data.transaction_lock(db, label = "Store.read_session_timing") { Store.private_data.read_session_timing(db, session, cache) } def read_command_timings(db: SQL.Database, session: String): Bytes = Store.private_data.transaction_lock(db, label = "Store.read_command_timings") { Store.private_data.read_command_timings(db, session) } def read_theory_timings(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_theory_timings") { Store.private_data.read_theory_timings(db, session, cache) } def read_ml_statistics(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_ml_statistics") { Store.private_data.read_ml_statistics(db, session, cache) } def read_task_statistics(db: SQL.Database, session: String): List[Properties.T] = Store.private_data.transaction_lock(db, label = "Store.read_task_statistics") { Store.private_data.read_task_statistics(db, session, cache) } def read_theories(db: SQL.Database, session: String): List[String] = read_theory_timings(db, session).flatMap(Markup.Name.unapply) def read_errors(db: SQL.Database, session: String): List[String] = Store.private_data.transaction_lock(db, label = "Store.read_errors") { Store.private_data.read_errors(db, session, cache) } def read_build(db: SQL.Database, session: String): Option[Store.Build_Info] = Store.private_data.transaction_lock(db, label = "Store.read_build") { if (session_info_exists(db)) Store.private_data.read_build(db, session) else None } def read_sources(db: SQL.Database, session: String, name: String = ""): List[Store.Source_File] = Store.private_data.transaction_lock(db, label = "Store.read_sources") { Store.private_data.read_sources(db, session, name, cache.compress) } } diff --git a/src/Pure/General/sql.scala b/src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala +++ b/src/Pure/General/sql.scala @@ -1,877 +1,894 @@ /* Title: Pure/General/sql.scala Author: Makarius Support for SQL databases: SQLite and PostgreSQL. See https://docs.oracle.com/en/java/javase/21/docs/api/java.sql/java/sql/Connection.html */ package isabelle import java.time.OffsetDateTime import java.sql.{DriverManager, Connection, PreparedStatement, ResultSet, SQLException} import org.sqlite.SQLiteConfig import org.sqlite.jdbc4.JDBC4Connection import org.postgresql.PGConnection import scala.collection.mutable object SQL { lazy val time_start = Time.now() /** SQL language **/ type Source = String /* concrete syntax */ def string(s: String): Source = { val q = '\'' val result = new StringBuilder(s.length + 10) result += q for (c <- s.iterator) { if (c == '\u0000') error("Illegal NUL character in SQL string literal") if (c == q) result += q result += c } result += q result.toString } def ident(s: String): Source = Long_Name.implode(Long_Name.explode(s).map(a => quote(a.replace("\"", "\"\"")))) def enclose(s: Source): Source = "(" + s + ")" def enclosure(ss: Iterable[Source]): Source = ss.mkString("(", ", ", ")") def separate(sql: Source): Source = (if (sql.isEmpty || sql.startsWith(" ")) "" else " ") + sql def select(columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = ""): Source = "SELECT " + (if (distinct) "DISTINCT " else "") + (if (columns.isEmpty) "*" else commas(columns.map(_.ident))) + " FROM " + sql val join_outer: Source = " LEFT OUTER JOIN " val join_inner: Source = " INNER JOIN " - def join(outer: Boolean): Source = if (outer) join_outer else join_inner def MULTI(args: Iterable[Source]): Source = args.iterator.filter(_.nonEmpty).mkString(";\n") def multi(args: Source*): Source = MULTI(args) def infix(op: Source, args: Iterable[Source]): Source = { val body = args.iterator.filter(_.nonEmpty).mkString(" " + op + " ") if_proper(body, enclose(body)) } def AND(args: Iterable[Source]): Source = infix("AND", args) def OR(args: Iterable[Source]): Source = infix("OR", args) def and(args: Source*): Source = AND(args) def or(args: Source*): Source = OR(args) val TRUE: Source = "TRUE" val FALSE: Source = "FALSE" def equal(sql: Source, x: Int): Source = sql + " = " + x def equal(sql: Source, x: Long): Source = sql + " = " + x def equal(sql: Source, x: String): Source = sql + " = " + string(x) + def member_int(sql: Source, set: Iterable[Int]): Source = + if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList) + def member_long(sql: Source, set: Iterable[Long]): Source = + if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList) def member(sql: Source, set: Iterable[String]): Source = - if (set.isEmpty) FALSE - else OR(set.iterator.map(equal(sql, _)).toList) + if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList) def where(sql: Source): Source = if_proper(sql, " WHERE " + sql) def where_and(args: Source*): Source = where(and(args:_*)) def where_or(args: Source*): Source = where(or(args:_*)) /* types */ enum Type { case Boolean, Int, Long, Double, String, Bytes, Date } val sql_type_postgresql: Type => Source = { case Type.Boolean => "BOOLEAN" case Type.Int => "INTEGER" case Type.Long => "BIGINT" case Type.Double => "DOUBLE PRECISION" case Type.String => "TEXT" case Type.Bytes => "BYTEA" case Type.Date => "TIMESTAMP WITH TIME ZONE" } val sql_type_sqlite: Type => Source = { case Type.Boolean => "INTEGER" case Type.Bytes => "BLOB" case Type.Date => "TEXT" case t => sql_type_postgresql(t) } /* columns */ object Column { def bool(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Boolean, strict, primary_key) def int(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Int, strict, primary_key) def long(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Long, strict, primary_key) def double(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Double, strict, primary_key) def string(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.String, strict, primary_key) def bytes(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Bytes, strict, primary_key) def date(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Date, strict, primary_key) } sealed case class Column( name: String, T: Type, strict: Boolean = false, primary_key: Boolean = false, expr: SQL.Source = "" ) { def make_primary_key: Column = copy(primary_key = true) def apply(table: Table): Column = Column(Long_Name.qualify(table.name, name), T, strict = strict, primary_key = primary_key) def ident: Source = if (expr == "") SQL.ident(name) else enclose(expr) + " AS " + SQL.ident(name) def decl(sql_type: Type => Source): Source = ident + " " + sql_type(T) + (if (strict || primary_key) " NOT NULL" else "") def defined: String = ident + " IS NOT NULL" def undefined: String = ident + " IS NULL" def equal(x: Int): Source = SQL.equal(ident, x) def equal(x: Long): Source = SQL.equal(ident, x) def equal(x: String): Source = SQL.equal(ident, x) def where_equal(x: Int): Source = SQL.where(equal(x)) def where_equal(x: Long): Source = SQL.where(equal(x)) def where_equal(x: String): Source = SQL.where(equal(x)) + def member_int(set: Iterable[Int]): Source = SQL.member_int(ident, set) + def member_long(set: Iterable[Long]): Source = SQL.member_long(ident, set) def member(set: Iterable[String]): Source = SQL.member(ident, set) + + def where_member_int(set: Iterable[Int]): Source = SQL.where(member_int(set)) + def where_member_long(set: Iterable[Long]): Source = SQL.where(member_long(set)) def where_member(set: Iterable[String]): Source = SQL.where(member(set)) def max: Column = copy(expr = "MAX(" + ident + ")") override def toString: Source = ident } def order_by(columns: List[Column], descending: Boolean = false): Source = " ORDER BY " + columns.mkString(", ") + (if (descending) " DESC" else "") /* tables */ sealed case class Table(name: String, columns: List[Column], body: Source = "") { Library.duplicates(columns.map(_.name)) match { case Nil => case bad => error("Duplicate column names " + commas_quote(bad) + " for table " + quote(name)) } def ident: Source = SQL.ident(name) def query: Source = if (body == "") error("Missing SQL body for table " + quote(name)) else SQL.enclose(body) def query_named: Source = query + " AS " + SQL.ident(name) def create(sql_type: Type => Source): Source = { val primary_key = columns.filter(_.primary_key).map(_.name) match { case Nil => Nil case keys => List("PRIMARY KEY " + enclosure(keys)) } "CREATE TABLE " + ident + " " + enclosure(columns.map(_.decl(sql_type)) ::: primary_key) } def insert_cmd(cmd: Source = "INSERT", sql: Source = ""): Source = cmd + " INTO " + ident + " VALUES " + enclosure(columns.map(_ => "?")) + SQL.separate(sql) def insert(sql: Source = ""): Source = insert_cmd(sql = sql) def delete(sql: Source = ""): Source = "DELETE FROM " + ident + SQL.separate(sql) def update(update_columns: List[Column] = Nil, sql: Source = ""): Source = "UPDATE " + ident + " SET " + commas(update_columns.map(c => c.ident + " = ?")) + SQL.separate(sql) def select( select_columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = "" ): Source = SQL.select(select_columns, distinct = distinct, sql = ident + SQL.separate(sql)) override def toString: Source = ident } /* table groups */ object Tables { def list(list: List[Table]): Tables = new Tables(list) def apply(args: Table*): Tables = list(args.toList) } final class Tables private(val list: List[Table]) extends Iterable[Table] { override def toString: String = list.mkString("SQL.Tables(", ", ", ")") def iterator: Iterator[Table] = list.iterator + def index(table: Table): Int = + iterator.zipWithIndex + .collectFirst({ case (t, i) if t.name == table.name => i }) + .getOrElse(error("No table " + quote(table.name))) + // requires transaction def lock(db: Database, create: Boolean = false): Boolean = { if (create) foreach(db.create_table(_)) val sql = db.lock_tables(list) if (sql.nonEmpty) { db.execute_statement(sql); true } else false } } /* access data */ def transaction_logger(): Logger = new System_Logger(guard_time = Time.guard_property("isabelle.transaction_trace")) abstract class Data(table_prefix: String = "") { def tables: Tables def transaction_lock[A]( db: Database, create: Boolean = false, label: String = "", log: Logger = transaction_logger() )(body: => A): A = { db.transaction_lock(tables, create = create, label = label, log = log)(body) } def make_table(columns: List[Column], body: String = "", name: String = ""): Table = { val table_name = List(proper_string(table_prefix), proper_string(name)).flatten.mkString("_") require(table_name.nonEmpty, "Undefined database table name") Table(table_name, columns, body = body) } } /** SQL database operations **/ /* statements */ class Batch_Error(val results: List[Int]) extends SQLException class Statement private[SQL](val db: Database, val rep: PreparedStatement) extends AutoCloseable { stmt => object bool { def update(i: Int, x: Boolean): Unit = rep.setBoolean(i, x) def update(i: Int, x: Option[Boolean]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.BOOLEAN) } } object int { def update(i: Int, x: Int): Unit = rep.setInt(i, x) def update(i: Int, x: Option[Int]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.INTEGER) } } object long { def update(i: Int, x: Long): Unit = rep.setLong(i, x) def update(i: Int, x: Option[Long]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.BIGINT) } } object double { def update(i: Int, x: Double): Unit = rep.setDouble(i, x) def update(i: Int, x: Option[Double]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.DOUBLE) } } object string { def update(i: Int, x: String): Unit = rep.setString(i, x) def update(i: Int, x: Option[String]): Unit = update(i, x.orNull) } object bytes { def update(i: Int, bytes: Bytes): Unit = { if (bytes == null) rep.setBytes(i, null) else rep.setBinaryStream(i, bytes.stream(), bytes.length) } def update(i: Int, bytes: Option[Bytes]): Unit = update(i, bytes.orNull) } object date { def update(i: Int, date: Date): Unit = db.update_date(stmt, i, date) def update(i: Int, date: Option[Date]): Unit = update(i, date.orNull) } def execute(): Boolean = rep.execute() def execute_batch(batch: IterableOnce[Statement => Unit]): Unit = { val it = batch.iterator if (it.nonEmpty) { for (body <- it) { body(this); rep.addBatch() } val res = rep.executeBatch() if (!res.forall(i => i >= 0 || i == java.sql.Statement.SUCCESS_NO_INFO)) { throw new Batch_Error(res.toList) } } } def execute_query(): Result = new Result(this, rep.executeQuery()) override def close(): Unit = rep.close() } /* results */ class Result private[SQL](val stmt: Statement, val rep: ResultSet) extends AutoCloseable { res => def next(): Boolean = rep.next() def iterator[A](get: Result => A): Iterator[A] = new Iterator[A] { private var _next: Boolean = res.next() def hasNext: Boolean = _next def next(): A = { val x = get(res); _next = res.next(); x } } def bool(column: Column): Boolean = rep.getBoolean(column.name) def int(column: Column): Int = rep.getInt(column.name) def long(column: Column): Long = rep.getLong(column.name) def double(column: Column): Double = rep.getDouble(column.name) def string(column: Column): String = { val s = rep.getString(column.name) if (s == null) "" else s } def bytes(column: Column): Bytes = { val bs = rep.getBytes(column.name) if (bs == null) Bytes.empty else Bytes(bs) } def date(column: Column): Date = stmt.db.date(res, column) def timing(c1: Column, c2: Column, c3: Column): Timing = Timing(Time.ms(long(c1)), Time.ms(long(c2)), Time.ms(long(c3))) def get[A](column: Column, f: Column => A): Option[A] = { val x = f(column) if (rep.wasNull || x == null) None else Some(x) } def get_bool(column: Column): Option[Boolean] = get(column, bool) def get_int(column: Column): Option[Int] = get(column, int) def get_long(column: Column): Option[Long] = get(column, long) def get_double(column: Column): Option[Double] = get(column, double) def get_string(column: Column): Option[String] = get(column, string) def get_bytes(column: Column): Option[Bytes] = get(column, bytes) def get_date(column: Column): Option[Date] = get(column, date) override def close(): Unit = rep.close() } /* notifications: IPC via database server */ sealed case class Notification(channel: String, payload: String = "") { override def toString = "Notification(" + channel + if_proper(payload, "," + payload) + ")" } /* database */ trait Database extends AutoCloseable { db => def is_sqlite: Boolean = isInstanceOf[SQLite.Database] def is_postgresql: Boolean = isInstanceOf[PostgreSQL.Database] def vacuum(tables: List[SQL.Table] = Nil): Unit = if (is_sqlite) execute_statement("VACUUM") // always FULL else if (tables.isEmpty) execute_statement("VACUUM FULL") else if (postgresql_major_version.get <= 10) { for (t <- tables) execute_statement("VACUUM " + t.ident) } else execute_statement("VACUUM" + commas(tables.map(_.ident))) def now(): Date /* types */ def sql_type(T: Type): Source /* connection */ def connection: Connection def sqlite_connection: Option[JDBC4Connection] = connection match { case conn: JDBC4Connection => Some(conn) case _ => None } def postgresql_connection: Option[PGConnection] = connection match { case conn: PGConnection => Some(conn) case _ => None } def the_sqlite_connection: JDBC4Connection = sqlite_connection getOrElse error("SQLite connection expected, but found " + connection.getClass.getName) def the_postgresql_connection: PGConnection = postgresql_connection getOrElse error("PostgreSQL connection expected, but found " + connection.getClass.getName) def postgresql_major_version: Option[Int] = if (is_postgresql) { def err(s: String): Nothing = error("Bad PostgreSQL version " + s) the_postgresql_connection.getParameterStatus("server_version") match { case null => err("null") case str => str.iterator.takeWhile(Symbol.is_ascii_digit).mkString match { case Value.Int(m) => Some(m) case _ => err(quote(str)) } } } else None override def close(): Unit = connection.close() def transaction[A](body: => A): A = connection.synchronized { require(connection.getAutoCommit(), "transaction already active") try { connection.setAutoCommit(false) try { val result = body connection.commit() result } catch { case exn: Throwable => connection.rollback(); throw exn } } finally { connection.setAutoCommit(true) } } def transaction_lock[A]( tables: Tables, create: Boolean = false, label: String = "", log: Logger = transaction_logger() )(body: => A): A = { val trace_count = - SQL.transaction_count() val trace_start = Time.now() var trace_nl = false def trace(msg: String): Unit = { val trace_time = Time.now() - trace_start if (log.guard(trace_time)) { time_start val nl = if (trace_nl) "" else { trace_nl = true; "\nnow = " + (Time.now() - time_start).toString + "\n" } log(nl + trace_time + " transaction " + trace_count + if_proper(label, " " + label) + ": " + msg) } } try { val res = transaction { trace("begin") if (tables.lock(db, create = create)) { trace("locked " + commas_quote(tables.list.map(_.name))) } val res = Exn.capture { body } trace("end") res } trace("commit") Exn.release(res) } catch { case exn: Throwable => trace("crash"); throw exn } } def lock_tables(tables: List[Table]): Source = "" // PostgreSQL only /* statements and results */ def statement(sql: Source): Statement = new Statement(db, connection.prepareStatement(sql)) def using_statement[A](sql: Source)(f: Statement => A): A = using(statement(sql))(f) def execute_statement(sql: Source, body: Statement => Unit = _ => ()): Unit = using_statement(sql) { stmt => body(stmt); stmt.execute() } def execute_batch_statement( sql: Source, batch: IterableOnce[Statement => Unit] = Nil ): Unit = using_statement(sql) { stmt => stmt.execute_batch(batch) } def execute_query_statement[A, B]( sql: Source, make_result: Iterator[A] => B, get: Result => A ): B = { using_statement(sql) { stmt => using(stmt.execute_query()) { res => make_result(res.iterator(get)) } } } def execute_query_statementO[A](sql: Source, get: Result => A): Option[A] = execute_query_statement[A, Option[A]](sql, _.nextOption, get) def execute_query_statementB(sql: Source): Boolean = using_statement(sql)(stmt => using(stmt.execute_query())(_.next())) def update_date(stmt: Statement, i: Int, date: Date): Unit def date(res: Result, column: Column): Date def insert_permissive(table: Table, sql: Source = ""): Source + def destroy(table: Table): Source = "DROP TABLE IF EXISTS " + table + /* tables and views */ def get_tables(pattern: String = "%"): List[String] = { val result = new mutable.ListBuffer[String] val rs = connection.getMetaData.getTables(null, null, pattern, null) while (rs.next) { result += rs.getString(3) } result.toList } def exists_table(name: String): Boolean = { val escape = connection.getMetaData.getSearchStringEscape val pattern = name.iterator.map(c => (if (c == '_' || c == '%' || c == escape(0)) escape else "") + c).mkString get_tables(pattern = pattern).nonEmpty } def exists_table(table: Table): Boolean = exists_table(table.name) def create_table(table: Table, sql: Source = ""): Unit = { if (!exists_table(table)) { execute_statement(table.create(sql_type) + SQL.separate(sql)) if (is_postgresql) { for (column <- table.columns if column.T == SQL.Type.Bytes) { execute_statement( "ALTER TABLE " + table + " ALTER COLUMN " + column + " SET STORAGE EXTERNAL") } } } } def create_view(table: Table): Unit = { if (!exists_table(table)) { execute_statement("CREATE VIEW " + table + " AS " + { table.query; table.body }) } } /* notifications (PostgreSQL only) */ def listen(channel: String): Unit = () def unlisten(channel: String = "*"): Unit = () def send(channel: String, payload: String): Unit = () final def send(channel: String): Unit = send(channel, "") final def send(notification: Notification): Unit = send(notification.channel, notification.payload) def receive(filter: Notification => Boolean): Option[List[Notification]] = None } private val transaction_count = Counter.make() } /** SQLite **/ object SQLite { // see https://www.sqlite.org/lang_datefunc.html val date_format: Date.Format = Date.Format("uuuu-MM-dd HH:mm:ss.SSS x") lazy val init_jdbc: Unit = { val lib_path = Path.explode("$ISABELLE_SQLITE_HOME/" + Platform.jvm_platform) val lib_name = File.get_file(lib_path).file_name System.setProperty("org.sqlite.lib.path", File.platform_path(lib_path)) System.setProperty("org.sqlite.lib.name", lib_name) Class.forName("org.sqlite.JDBC") } def open_database(path: Path, restrict: Boolean = false): Database = { init_jdbc val path0 = path.expand val s0 = File.platform_path(path0) val s1 = if (Platform.is_windows) s0.replace('\\', '/') else s0 val config = new SQLiteConfig() config.setEncoding(SQLiteConfig.Encoding.UTF8) val connection = config.createConnection("jdbc:sqlite:" + s1) val db = new Database(path0.toString, connection) try { if (restrict) File.restrict(path0) } catch { case exn: Throwable => db.close(); throw exn } db } class Database private[SQLite](name: String, val connection: Connection) extends SQL.Database { override def toString: String = name override def now(): Date = Date.now() def sql_type(T: SQL.Type): SQL.Source = SQL.sql_type_sqlite(T) def update_date(stmt: SQL.Statement, i: Int, date: Date): Unit = if (date == null) stmt.string(i) = (null: String) else stmt.string(i) = date_format(date) def date(res: SQL.Result, column: SQL.Column): Date = proper_string(res.string(column)) match { case None => null case Some(s) => date_format.parse(s) } def insert_permissive(table: SQL.Table, sql: SQL.Source = ""): SQL.Source = table.insert_cmd(cmd = "INSERT OR IGNORE", sql = sql) } } /** PostgreSQL **/ // see https://www.postgresql.org/docs/14/index.html // see https://jdbc.postgresql.org/documentation object PostgreSQL { type Source = SQL.Source lazy val init_jdbc: Unit = Class.forName("org.postgresql.Driver") val default_server: SSH.Server = SSH.local_server(port = 5432) def open_database( user: String, password: String, database: String = "", server: SSH.Server = default_server, server_close: Boolean = false, receiver_delay: Time = Time.seconds(0.5) ): Database = { init_jdbc if (user.isEmpty) error("Undefined database user") if (server.host.isEmpty) error("Undefined database server host") if (server.port <= 0) error("Undefined database server port") val name = proper_string(database) getOrElse user val url = "jdbc:postgresql://" + server.host + ":" + server.port + "/" + name val ssh = server.ssh_system.ssh_session val print = "server " + quote(user + "@" + server + "/" + name) + if_proper(ssh, " via ssh " + quote(ssh.get.toString)) val connection = DriverManager.getConnection(url, user, password) val db = new Database(connection, print, server, server_close, receiver_delay) try { db.execute_statement("SET standard_conforming_strings = on") } catch { case exn: Throwable => db.close(); throw exn } db } def open_server( options: Options, host: String = "", port: Int = 0, ssh_host: String = "", ssh_port: Int = 0, ssh_user: String = "" ): SSH.Server = { val server_host = proper_string(host).getOrElse(default_server.host) val server_port = if (port > 0) port else default_server.port if (ssh_host.isEmpty) SSH.local_server(host = server_host, port = server_port) else { SSH.open_server(options, host = ssh_host, port = ssh_port, user = ssh_user, remote_host = server_host, remote_port = server_port) } } def open_database_server( options: Options, user: String = "", password: String = "", database: String = "", server: SSH.Server = SSH.no_server, host: String = "", port: Int = 0, ssh_host: String = "", ssh_port: Int = 0, ssh_user: String = "" ): PostgreSQL.Database = { val db_server = if (server.defined) server else { open_server(options, host = host, port = port, ssh_host = ssh_host, ssh_port = ssh_port, ssh_user = ssh_user) } val server_close = !server.defined try { open_database(user = user, password = password, database = database, server = db_server, server_close = server_close) } catch { case exn: Throwable if server_close => db_server.close(); throw exn } } class Database private[PostgreSQL]( val connection: Connection, print: String, server: SSH.Server, server_close: Boolean, receiver_delay: Time ) extends SQL.Database { override def toString: String = print override def now(): Date = { val now = SQL.Column.date("now") execute_query_statementO[Date]("SELECT NOW() as " + now.ident, res => res.date(now)) .getOrElse(error("Failed to get current date/time from database server " + toString)) } def sql_type(T: SQL.Type): SQL.Source = SQL.sql_type_postgresql(T) // see https://jdbc.postgresql.org/documentation/head/8-date-time.html def update_date(stmt: SQL.Statement, i: Int, date: Date): Unit = if (date == null) stmt.rep.setObject(i, null) else stmt.rep.setObject(i, OffsetDateTime.from(date.to(Date.timezone_utc).rep)) def date(res: SQL.Result, column: SQL.Column): Date = { val obj = res.rep.getObject(column.name, classOf[OffsetDateTime]) if (obj == null) null else Date.instant(obj.toInstant) } def insert_permissive(table: SQL.Table, sql: SQL.Source = ""): SQL.Source = table.insert_cmd(sql = if_proper(sql, sql + " ") + "ON CONFLICT DO NOTHING") + override def destroy(table: SQL.Table): SQL.Source = + super.destroy(table) + " CASCADE" + /* explicit locking: only applicable to PostgreSQL within transaction context */ // see https://www.postgresql.org/docs/14/sql-lock.html // see https://www.postgresql.org/docs/14/explicit-locking.html override def lock_tables(tables: List[SQL.Table]): PostgreSQL.Source = if_proper(tables, "LOCK TABLE " + tables.mkString(", ") + " IN ACCESS EXCLUSIVE MODE") /* notifications: IPC via database server */ /* - see https://www.postgresql.org/docs/14/sql-notify.html - self-notifications and repeated notifications are suppressed - notifications are sorted by local system time (nano seconds) - receive() == None means that IPC is inactive or unavailable (SQLite) */ private var _receiver_buffer: Option[Map[SQL.Notification, Long]] = None private lazy val _receiver_thread = Isabelle_Thread.fork(name = "PostgreSQL.receiver", daemon = true, uninterruptible = true) { val conn = the_postgresql_connection val self_pid = conn.getBackendPID try { while (true) { Isabelle_Thread.interruptible { receiver_delay.sleep(); Option(conn.getNotifications())} match { case Some(array) if array.nonEmpty => synchronized { var received = _receiver_buffer.getOrElse(Map.empty) for (a <- array.iterator if a.getPID != self_pid) { val msg = SQL.Notification(a.getName, a.getParameter) if (!received.isDefinedAt(msg)) { val stamp = System.nanoTime() received = received + (msg -> stamp) } } _receiver_buffer = Some(received) } case _ => } } } catch { case Exn.Interrupt() => } } private def receiver_shutdown(): Unit = synchronized { if (_receiver_buffer.isDefined) { _receiver_thread.interrupt() Some(_receiver_thread) } else None }.foreach(_.join()) private def synchronized_receiver[A](body: => A): A = synchronized { if (_receiver_buffer.isEmpty) { _receiver_buffer = Some(Map.empty) _receiver_thread } body } override def listen(channel: String): Unit = synchronized_receiver { execute_statement("LISTEN " + SQL.ident(channel)) } override def unlisten(channel: String = "*"): Unit = synchronized_receiver { execute_statement("UNLISTEN " + (if (channel == "*") channel else SQL.ident(channel))) } override def send(channel: String, payload: String): Unit = synchronized_receiver { execute_statement( "NOTIFY " + SQL.ident(channel) + if_proper(payload, ", " + SQL.string(payload))) } override def receive( filter: SQL.Notification => Boolean = _ => true ): Option[List[SQL.Notification]] = synchronized { _receiver_buffer.map { received => val filtered = received.keysIterator.filter(filter).toList if (filtered.nonEmpty) { _receiver_buffer = Some(received -- filtered) filtered.map(msg => msg -> received(msg)).sortBy(_._2).map(_._1) } else Nil } } override def close(): Unit = { receiver_shutdown() super.close() if (server_close) server.close() } } } diff --git a/src/Pure/ML/ml_heap.scala b/src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala +++ b/src/Pure/ML/ml_heap.scala @@ -1,319 +1,319 @@ /* Title: Pure/ML/ml_heap.scala Author: Makarius ML heap operations. */ package isabelle object ML_Heap { /** heap file with SHA1 digest **/ private val sha1_prefix = "SHA1:" def read_file_digest(heap: Path): Option[SHA1.Digest] = { if (heap.is_file) { val l = sha1_prefix.length val m = l + SHA1.digest_length val n = File.size(heap) val bs = Bytes.read_file(heap, offset = n - m) if (bs.length == m) { val s = bs.text if (s.startsWith(sha1_prefix)) Some(SHA1.fake_digest(s.substring(l))) else None } else None } else None } def write_file_digest(heap: Path): SHA1.Digest = read_file_digest(heap) getOrElse { val digest = SHA1.digest(heap) File.append(heap, sha1_prefix + digest.toString) digest } /* SQL data model */ sealed case class Log_DB(uuid: String, content: Bytes) object private_data extends SQL.Data("isabelle_heaps") { - override lazy val tables = SQL.Tables(Base.table, Slices.table) + override lazy val tables: SQL.Tables = SQL.Tables(Base.table, Slices.table) object Generic { val name = SQL.Column.string("name").make_primary_key } object Base { val name = Generic.name val heap_size = SQL.Column.long("heap_size") val heap_digest = SQL.Column.string("heap_digest") val uuid = SQL.Column.string("uuid") val log_db = SQL.Column.bytes("log_db") val table = make_table(List(name, heap_size, heap_digest, uuid, log_db)) } object Size { val name = Generic.name val heap = SQL.Column.string("heap") val log_db = SQL.Column.string("log_db") val table = make_table(List(name, heap, log_db), body = "SELECT name, pg_size_pretty(heap_size::bigint) as heap, " + " pg_size_pretty(length(log_db)::bigint) as log_db FROM " + Base.table.ident, name = "size") } object Slices { val name = Generic.name val slice = SQL.Column.int("slice").make_primary_key val content = SQL.Column.bytes("content") val table = make_table(List(name, slice, content), name = "slices") } object Slices_Size { val name = Generic.name val slice = Slices.slice val size = SQL.Column.string("size") val table = make_table(List(name, slice, size), body = "SELECT name, slice, pg_size_pretty(length(content)::bigint) as size FROM " + Slices.table.ident, name = "slices_size") } def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = if (names.isEmpty) Map.empty else { db.execute_query_statement( Base.table.select(List(Base.name, Base.heap_digest), sql = Generic.name.where_member(names)), List.from[(String, String)], res => res.string(Base.name) -> res.string(Base.heap_digest) ).collect({ case (name, digest) if digest.nonEmpty => name -> SHA1.fake_digest(digest) }).toMap } def read_slices(db: SQL.Database, name: String): List[Bytes] = db.execute_query_statement( Slices.table.select(List(Slices.content), sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))), List.from[Bytes], _.bytes(Slices.content)) def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] = db.execute_query_statement( Base.table.select(List(Base.uuid, Base.log_db), sql = SQL.where_and( Generic.name.equal(name), if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))), List.from[(String, Bytes)], res => (res.string(Base.uuid), res.bytes(Base.log_db)) ).collectFirst( { case (uuid, content) if uuid.nonEmpty && !content.is_empty => Log_DB(uuid, content) }) def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit = db.execute_statement(Slices.table.insert(), body = { stmt => stmt.string(1) = name stmt.int(2) = slice stmt.bytes(3) = content }) def clean_entry(db: SQL.Database, name: String): Unit = { for (table <- List(Base.table, Slices.table)) { db.execute_statement(table.delete(sql = Base.name.where_equal(name))) } } def init_entry( db: SQL.Database, name: String, heap_size: Long, heap_digest: Option[SHA1.Digest], log_db: Option[Log_DB] ): Unit = { clean_entry(db, name) for (table <- List(Size.table, Slices_Size.table)) { db.create_view(table) } db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = name stmt.long(2) = heap_size stmt.string(3) = heap_digest.map(_.toString) stmt.string(4) = log_db.map(_.uuid) stmt.bytes(5) = log_db.map(_.content) }) } def update_entry( db: SQL.Database, name: String, heap_size: Long, heap_digest: Option[SHA1.Digest], log_db: Option[Log_DB] ): Unit = db.execute_statement( Base.table.update(List(Base.heap_size, Base.heap_digest, Base.uuid, Base.log_db), sql = Base.name.where_equal(name)), body = { stmt => stmt.long(1) = heap_size stmt.string(2) = heap_digest.map(_.toString) stmt.string(3) = log_db.map(_.uuid) stmt.bytes(4) = log_db.map(_.content) }) } def clean_entry(db: SQL.Database, session_name: String): Unit = private_data.transaction_lock(db, create = true, label = "ML_Heap.clean_entry") { private_data.clean_entry(db, session_name) } def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = if (names.isEmpty) Map.empty else { private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") { private_data.read_digests(db, names) } } def store( db: SQL.Database, session: Store.Session, slice: Space, cache: Compress.Cache = Compress.Cache.none, progress: Progress = new Progress ): Unit = { val log_db = for { path <- session.log_db uuid <- proper_string(Store.read_build_uuid(path, session.name)) } yield Log_DB(uuid, Bytes.read(path)) val heap_digest = session.heap.map(write_file_digest) val heap_size = session.heap match { case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length case None => 0L } val slice_size = slice.bytes max Space.MiB(1).bytes val slices = (heap_size.toDouble / slice_size.toDouble).ceil.toInt val step = if (slices == 0) 0L else (heap_size.toDouble / slices.toDouble).ceil.toLong def slice_content(i: Int): Bytes = { val j = i + 1 val offset = step * i val limit = if (j < slices) step * j else heap_size Bytes.read_file(session.the_heap, offset = offset, limit = limit) .compress(cache = cache) } try { if (slices > 0) progress.echo("Storing " + session.name + " ...") // init entry: slice 0 + initial log_db { val (heap_size0, heap_digest0) = if (slices > 1) (0L, None) else (heap_size, heap_digest) val log_db0 = if (slices <= 1) log_db else None val content0 = if (slices > 0) Some(slice_content(0)) else None if (log_db0.isDefined) progress.echo("Storing " + session.log_db_name + " ...") private_data.transaction_lock(db, create = true, label = "ML_Heap.store") { private_data.init_entry(db, session.name, heap_size0, heap_digest0, log_db0) for (content <- content0) private_data.write_slice(db, session.name, 0, content) } } // update entry: slice 1 ... + final log_db if (slices > 1) { for (i <- 1 until slices) { val content = slice_content(i) private_data.transaction_lock(db, label = "ML_Heap.store" + i) { private_data.write_slice(db, session.name, i, content) } } if (log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") private_data.transaction_lock(db, label = "ML_Heap.store_update") { private_data.update_entry(db, session.name, heap_size, heap_digest, log_db) } } } catch { case exn: Throwable => private_data.transaction_lock(db, create = true, label = "ML_Heap.store_clean") { private_data.clean_entry(db, session.name) } throw exn } } def restore( db: SQL.Database, sessions: List[Store.Session], cache: Compress.Cache = Compress.Cache.none, progress: Progress = new Progress ): Unit = { if (sessions.exists(_.defined)) { private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") { /* heap */ val defined_heaps = for (session <- sessions; heap <- session.heap) yield session.name -> heap val db_digests = private_data.read_digests(db, defined_heaps.map(_._1)) for ((session_name, heap) <- defined_heaps) { val file_digest = read_file_digest(heap) val db_digest = db_digests.get(session_name) if (db_digest.isDefined && db_digest != file_digest) { progress.echo("Restoring " + session_name + " ...") val base_dir = Isabelle_System.make_directory(heap.expand.dir) Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp => Bytes.write(tmp, Bytes.empty) for (slice <- private_data.read_slices(db, session_name)) { Bytes.append(tmp, slice.uncompress(cache = cache)) } val digest = write_file_digest(tmp) if (db_digest.get == digest) { Isabelle_System.chmod("a+r", tmp) Isabelle_System.move_file(tmp, heap) } else error("Incoherent content for session heap " + heap) } } } /* log_db */ for (session <- sessions; path <- session.log_db) { val file_uuid = Store.read_build_uuid(path, session.name) private_data.read_log_db(db, session.name, old_uuid = file_uuid) match { case Some(log_db) if file_uuid.isEmpty => progress.echo("Restoring " + session.log_db_name + " ...") Isabelle_System.make_directory(path.expand.dir) Bytes.write(path, log_db.content) case Some(_) => error("Incoherent content for session database " + path) case None => } } } } } } diff --git a/src/Pure/System/host.scala b/src/Pure/System/host.scala --- a/src/Pure/System/host.scala +++ b/src/Pure/System/host.scala @@ -1,262 +1,262 @@ /* Title: Pure/System/host.scala Author: Makarius Information about compute hosts, including NUMA: Non-Uniform Memory Access of separate CPU nodes. See also https://www.open-mpi.org/projects/hwloc --- notably "lstopo" or "hwloc-ls" (e.g. via Ubuntu package "hwloc"). */ package isabelle object Host { object Range { val Single = """^(\d+)$""".r val Multiple = """^(\d+)-(\d+)$""".r def apply(range: List[Int]): String = range match { case Nil => "" case x :: xs => def elem(start: Int, stop: Int): String = if (start == stop) start.toString else start.toString + "-" + stop.toString val (elems, (r0, rn)) = xs.foldLeft((List.empty[String], (x, x))) { case ((rs, (r0, rn)), x) => if (rn + 1 == x) (rs, (r0, x)) else (rs :+ elem(r0, rn), (x, x)) } (elems :+ elem(r0, rn)).mkString(",") } def unapply(s: String): Option[List[Int]] = space_explode(',', s).foldRight(Option(List.empty[Int])) { case (Single(Value.Int(i)), Some(elems)) => Some(i :: elems) case (Multiple(Value.Int(i), Value.Int(j)), Some(elems)) => Some((i to j).toList ::: elems) case _ => None } def from(s: String): List[Int] = s match { case Range(r) => r case _ => Nil } } /* process policy via numactl and taskset tools */ def taskset(cpus: List[Int]): String = "taskset --cpu-list " + Range(cpus) def taskset_ok(cpus: List[Int]): Boolean = Isabelle_System.bash(taskset(cpus) + " true").ok def numactl(node: Int, rel_cpus: List[Int] = Nil): String = "numactl -m" + node + " -N" + node + if_proper(rel_cpus, " -C+" + Range(rel_cpus)) def numactl_ok(node: Int, rel_cpus: List[Int] = Nil): Boolean = Isabelle_System.bash(numactl(node, rel_cpus) + " true").ok def numa_options(options: Options, numa_node: Option[Int]): Options = numa_node match { case None => options case Some(node) => options.string.update("process_policy", if (numactl_ok(node)) numactl(node) else "") } def node_options(options: Options, node: Node_Info): Options = { val threads_options = if (node.rel_cpus.isEmpty) options else options.int.update("threads", node.rel_cpus.length) node.numa_node match { case None if node.rel_cpus.isEmpty => threads_options case Some(numa_node) => threads_options.string.update("process_policy", if (numactl_ok(numa_node, node.rel_cpus)) numactl(numa_node, node.rel_cpus) else "") case _ => threads_options.string.update("process_policy", if (taskset_ok(node.rel_cpus)) taskset(node.rel_cpus) else "") } } /* allocated resources */ object Node_Info { def none: Node_Info = Node_Info("", None, Nil) } sealed case class Node_Info(hostname: String, numa_node: Option[Int], rel_cpus: List[Int]) { def numa: Boolean = numa_node.isDefined || rel_cpus.nonEmpty override def toString: String = hostname + if_proper(numa_node, "/" + numa_node.get.toString) + if_proper(rel_cpus, "+" + Range(rel_cpus)) } /* statically available resources */ private val numa_info_linux: Path = Path.explode("/sys/devices/system/node/online") def parse_numa_info(numa_info: String): List[Int] = numa_info match { case Range(nodes) => nodes case s => error("Cannot parse CPU NUMA node specification: " + quote(s)) } def numa_nodes(enabled: Boolean = true, ssh: SSH.System = SSH.Local): List[Int] = { val numa_info = if (ssh.isabelle_platform.is_linux) Some(numa_info_linux) else None for { path <- numa_info.toList if enabled && ssh.is_file(path) n <- parse_numa_info(ssh.read(path).trim) } yield n } def numa_node0(): Option[Int] = try { numa_nodes() match { case ns if ns.length >= 2 && numactl_ok(ns.head) => Some(ns.head) case _ => None } } catch { case ERROR(_) => None } object Info { def init( hostname: String = SSH.LOCAL, ssh: SSH.System = SSH.Local, score: Option[Double] = None ): Info = Info(hostname, numa_nodes(ssh = ssh), Multithreading.num_processors(ssh = ssh), score) } sealed case class Info( hostname: String, numa_nodes: List[Int], num_cpus: Int, benchmark_score: Option[Double] ) { override def toString: String = hostname } /* shuffling of NUMA nodes */ def numa_check(progress: Progress, enabled: Boolean): Boolean = { def warning = numa_nodes() match { case ns if ns.length < 2 => Some("no NUMA nodes available") case ns if !numactl_ok(ns.head) => Some("bad numactl tool") case _ => None } enabled && (warning match { case Some(s) => progress.echo_warning("Shuffling of NUMA CPU nodes is disabled: " + s) false case _ => true }) } /* SQL data model */ object private_data extends SQL.Data("isabelle_host") { val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db") - override lazy val tables = SQL.Tables(Node_Info.table, Info.table) + override lazy val tables: SQL.Tables = SQL.Tables(Node_Info.table, Info.table) object Node_Info { val hostname = SQL.Column.string("hostname").make_primary_key val numa_next = SQL.Column.int("numa_next") val table = make_table(List(hostname, numa_next), name = "node_info") } def read_numa_next(db: SQL.Database, hostname: String): Int = db.execute_query_statementO[Int]( Node_Info.table.select(List(Node_Info.numa_next), sql = Node_Info.hostname.where_equal(hostname)), res => res.int(Node_Info.numa_next) ).getOrElse(0) def write_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Unit = { db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname))) db.execute_statement(Node_Info.table.insert(), body = { stmt => stmt.string(1) = hostname stmt.int(2) = numa_next }) } object Info { val hostname = SQL.Column.string("hostname").make_primary_key val numa_info = SQL.Column.string("numa_info") val num_cpus = SQL.Column.int("num_cpus") val benchmark_score = SQL.Column.double("benchmark_score") val table = make_table(List(hostname, numa_info, num_cpus, benchmark_score), name = "info") } def write_info(db: SQL.Database, info: Info): Unit = { db.execute_statement(Info.table.delete(sql = Info.hostname.where_equal(info.hostname))) db.execute_statement(Info.table.insert(), body = { stmt => stmt.string(1) = info.hostname stmt.string(2) = info.numa_nodes.mkString(",") stmt.int(3) = info.num_cpus stmt.double(4) = info.benchmark_score }) } def read_info(db: SQL.Database, hostname: String): Option[Host.Info] = db.execute_query_statementO[Host.Info]( Info.table.select(Info.table.columns.tail, sql = Info.hostname.where_equal(hostname)), { res => val numa_info = res.string(Info.numa_info) val num_cpus = res.int(Info.num_cpus) val benchmark_score = res.get_double(Info.benchmark_score) Host.Info(hostname, parse_numa_info(numa_info), num_cpus, benchmark_score) }) } def next_numa_node( db: SQL.Database, hostname: String, available_nodes: List[Int], used_nodes: => Set[Int] ): Option[Int] = if (available_nodes.isEmpty) None else { val available = available_nodes.zipWithIndex val used = used_nodes private_data.transaction_lock(db, create = true, label = "Host.next_numa_node") { val numa_next = private_data.read_numa_next(db, hostname) val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0) val candidates = available.drop(numa_index) ::: available.take(numa_index) val (n, i) = candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head val numa_next1 = available_nodes((i + 1) % available_nodes.length) if (numa_next != numa_next1) private_data.write_numa_next(db, hostname, numa_next1) Some(n) } } def write_info(db: SQL.Database, info: Info): Unit = private_data.transaction_lock(db, create = true, label = "Host.write_info") { private_data.write_info(db, info) } def read_info(db: SQL.Database, hostname: String): Option[Host.Info] = private_data.transaction_lock(db, create = true, label = "Host.read_info") { private_data.read_info(db, hostname) } } diff --git a/src/Pure/System/progress.scala b/src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala +++ b/src/Pure/System/progress.scala @@ -1,540 +1,541 @@ /* Title: Pure/System/progress.scala Author: Makarius Progress context for system processes. */ package isabelle import java.util.{Map => JMap} import java.io.{File => JFile} import scala.collection.immutable.SortedMap object Progress { /* output */ sealed abstract class Output { def message: Message } enum Kind { case writeln, warning, error_message } sealed case class Message( kind: Kind, text: String, verbose: Boolean = false ) extends Output { override def message: Message = this def output_text: String = kind match { case Kind.writeln => Output.writeln_text(text) case Kind.warning => Output.warning_text(text) case Kind.error_message => Output.error_message_text(text) } override def toString: String = output_text } sealed case class Theory( theory: String, session: String = "", percentage: Option[Int] = None ) extends Output { def message: Message = Message(Kind.writeln, print_session + print_theory + print_percentage, verbose = true) def print_session: String = if_proper(session, session + ": ") def print_theory: String = "theory " + theory def print_percentage: String = percentage match { case None => "" case Some(p) => " " + p + "%" } } /* SQL data model */ object private_data extends SQL.Data("isabelle_progress") { val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db") - override lazy val tables = SQL.Tables(Base.table, Agents.table, Messages.table) + override lazy val tables: SQL.Tables = + SQL.Tables(Base.table, Agents.table, Messages.table) object Base { val context_uuid = SQL.Column.string("context_uuid").make_primary_key val context = SQL.Column.long("context").make_primary_key val stopped = SQL.Column.bool("stopped") val table = make_table(List(context_uuid, context, stopped)) } object Agents { val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key val context_uuid = SQL.Column.string("context_uuid").make_primary_key val kind = SQL.Column.string("kind") val hostname = SQL.Column.string("hostname") val java_pid = SQL.Column.long("java_pid") val java_start = SQL.Column.date("java_start") val start = SQL.Column.date("start") val stamp = SQL.Column.date("stamp") val stop = SQL.Column.date("stop") val seen = SQL.Column.long("seen") val table = make_table( List(agent_uuid, context_uuid, kind, hostname, java_pid, java_start, start, stamp, stop, seen), name = "agents") } object Messages { type T = SortedMap[Long, Message] val empty: T = SortedMap.empty val context = SQL.Column.long("context").make_primary_key val serial = SQL.Column.long("serial").make_primary_key val kind = SQL.Column.int("kind") val text = SQL.Column.string("text") val verbose = SQL.Column.bool("verbose") val table = make_table(List(context, serial, kind, text, verbose), name = "messages") } val channel: String = Base.table.name val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping") val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output") def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] = db.execute_query_statementO( Base.table.select(List(Base.context), sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context)) def next_progress_context(db: SQL.Database): Long = db.execute_query_statementO( Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L def read_progress_stopped(db: SQL.Database, context: Long): Boolean = db.execute_query_statementO( Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)), _.bool(Base.stopped) ).getOrElse(false) def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit = db.execute_statement( Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)), body = { stmt => stmt.bool(1) = stopped }) def update_agent( db: SQL.Database, agent_uuid: String, seen: Long, stop_now: Boolean = false ): Unit = { val sql = Agents.agent_uuid.where_equal(agent_uuid) val stop = db.execute_query_statementO( Agents.table.select(List(Agents.stop), sql = sql), _.get_date(Agents.stop)).flatten db.execute_statement( Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), sql = sql), body = { stmt => val now = db.now() stmt.date(1) = now stmt.date(2) = if (stop_now) Some(now) else stop stmt.long(3) = seen }) } def read_messages_serial(db: SQL.Database, context: Long): Long = db.execute_query_statementO( Messages.table.select( List(Messages.serial.max), sql = Base.context.where_equal(context)), _.long(Messages.serial) ).getOrElse(0L) def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T = db.execute_query_statement( Messages.table.select( List(Messages.serial, Messages.kind, Messages.text, Messages.verbose), sql = SQL.where_and( Messages.context.ident + " = " + context, if (seen <= 0) "" else Messages.serial.ident + " > " + seen)), SortedMap.from[Long, Message], { res => val serial = res.long(Messages.serial) val kind = Kind.fromOrdinal(res.int(Messages.kind)) val text = res.string(Messages.text) val verbose = res.bool(Messages.verbose) serial -> Message(kind, text, verbose = verbose) } ) def write_messages( db: SQL.Database, context: Long, messages: List[(Long, Message)] ): Unit = { db.execute_batch_statement(Messages.table.insert(), batch = for ((serial, message) <- messages) yield { (stmt: SQL.Statement) => stmt.long(1) = context stmt.long(2) = serial stmt.int(3) = message.kind.ordinal stmt.string(4) = message.text stmt.bool(5) = message.verbose }) } } } class Progress { def now(): Date = Date.now() val start: Date = now() def verbose: Boolean = false final def do_output(message: Progress.Message): Boolean = !message.verbose || verbose def output(message: Progress.Message): Unit = {} final def echo(msg: String, verbose: Boolean = false): Unit = output(Progress.Message(Progress.Kind.writeln, msg, verbose = verbose)) final def echo_warning(msg: String, verbose: Boolean = false): Unit = output(Progress.Message(Progress.Kind.warning, msg, verbose = verbose)) final def echo_error_message(msg: String, verbose: Boolean = false): Unit = output(Progress.Message(Progress.Kind.error_message, msg, verbose = verbose)) final def echo_if(cond: Boolean, msg: String): Unit = if (cond) echo(msg) def theory(theory: Progress.Theory): Unit = output(theory.message) def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = {} final def capture[A](e: => A, msg: String, err: Throwable => String): Exn.Result[A] = { echo(msg) try { Exn.Res(e) } catch { case exn: Throwable => echo_error_message(err(exn)); Exn.Exn[A](exn) } } final def timeit[A]( body: => A, message: Exn.Result[A] => String = null, enabled: Boolean = true ): A = Timing.timeit(body, message = message, enabled = enabled, output = echo(_)) @volatile private var is_stopped = false def stop(): Unit = { is_stopped = true } def stopped: Boolean = { if (Thread.interrupted()) is_stopped = true is_stopped } def stopped_local: Boolean = false final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e } final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt() override def toString: String = if (stopped) "Progress(stopped)" else "Progress" final def bash(script: String, cwd: JFile = null, env: JMap[String, String] = Isabelle_System.settings(), redirect: Boolean = false, echo: Boolean = false, watchdog: Time = Time.zero, strict: Boolean = true ): Process_Result = { val result = Isabelle_System.bash(script, cwd = cwd, env = env, redirect = redirect, progress_stdout = echo_if(echo, _), progress_stderr = echo_if(echo, _), watchdog = if (watchdog.is_zero) None else Some((watchdog, _ => stopped)), strict = strict) if (strict && stopped) throw Exn.Interrupt() else result } } class Console_Progress(override val verbose: Boolean = false, stderr: Boolean = false) extends Progress { override def output(message: Progress.Message): Unit = synchronized { if (do_output(message)) { Output.output(message.output_text, stdout = !stderr, include_empty = true) } } override def toString: String = super.toString + ": console" } class File_Progress(path: Path, override val verbose: Boolean = false) extends Progress { override def output(message: Progress.Message): Unit = synchronized { if (do_output(message)) File.append(path, message.output_text + "\n") } override def toString: String = super.toString + ": " + path.toString } /* database progress */ class Database_Progress( db: SQL.Database, base_progress: Progress, input_messages: Boolean = false, output_stopped: Boolean = false, kind: String = "progress", hostname: String = Isabelle_System.hostname(), context_uuid: String = UUID.random_string(), timeout: Option[Time] = None, tick_expire: Int = 50) extends Progress { database_progress => override def now(): Date = db.now() override val start: Date = now() if (UUID.unapply(context_uuid).isEmpty) { error("Bad Database_Progress.context_uuid: " + quote(context_uuid)) } private var _tick: Long = 0 private var _agent_uuid: String = "" private var _context: Long = -1 private var _serial: Long = 0 private var _stopped_db: Boolean = false private var _consumer: Consumer_Thread[Progress.Output] = null def agent_uuid: String = synchronized { _agent_uuid } private def init(): Unit = synchronized { db.listen(Progress.private_data.channel) Progress.private_data.transaction_lock(db, create = true, label = "Database_Progress.init") { Progress.private_data.read_progress_context(db, context_uuid) match { case Some(context) => _context = context _agent_uuid = UUID.random_string() case None => _context = Progress.private_data.next_progress_context(db) _agent_uuid = context_uuid db.execute_statement(Progress.private_data.Base.table.insert(), { stmt => stmt.string(1) = context_uuid stmt.long(2) = _context stmt.bool(3) = false }) } db.execute_statement(Progress.private_data.Agents.table.insert(), { stmt => val java = ProcessHandle.current() val java_pid = java.pid val java_start = Date.instant(java.info.startInstant.get) stmt.string(1) = _agent_uuid stmt.string(2) = context_uuid stmt.string(3) = kind stmt.string(4) = hostname stmt.long(5) = java_pid stmt.date(6) = java_start stmt.date(7) = start stmt.date(8) = start stmt.date(9) = None stmt.long(10) = 0L }) } if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list) def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = { val expired = synchronized { _tick += 1; _tick % tick_expire == 0 } val received = db.receive(n => n.channel == Progress.private_data.channel) val ok = bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped || received.isEmpty || received.get.contains(Progress.private_data.channel_ping) || input_messages && received.get.contains(Progress.private_data.channel_output) if (ok) { sync_database { if (bulk_output.nonEmpty) { for (out <- bulk_output) { out match { case message: Progress.Message => if (do_output(message)) base_progress.output(message) case theory: Progress.Theory => base_progress.theory(theory) } } val messages = for ((out, i) <- bulk_output.zipWithIndex) yield (_serial + i + 1) -> out.message Progress.private_data.write_messages(db, _context, messages) _serial = messages.last._1 db.send(Progress.private_data.channel_output) } bulk_output.map(_ => Exn.Res(())) } } else Nil } _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( bulk = _ => true, timeout = timeout, consume = { bulk_output => val results = if (bulk_output.isEmpty) consume(Nil) else bulk_output.grouped(200).toList.flatMap(consume) (results, true) }) } def close(): Unit = synchronized { if (_context > 0) { _consumer.shutdown() _consumer = null Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") { Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true) } _context = 0 } db.close() } private def sync_context[A](body: => A): A = synchronized { if (_context < 0) throw new IllegalStateException("Database_Progress before init") if (_context == 0) throw new IllegalStateException("Database_Progress after exit") body } private def sync_database[A](body: => A): A = synchronized { Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") { _stopped_db = Progress.private_data.read_progress_stopped(db, _context) if (_stopped_db && !base_progress.stopped) base_progress.stop() if (!_stopped_db && base_progress.stopped && output_stopped) { Progress.private_data.write_progress_stopped(db, _context, true) db.send(Progress.private_data.channel_ping) } val serial0 = _serial if (input_messages) { val messages = Progress.private_data.read_messages(db, _context, seen = _serial) for ((message_serial, message) <- messages) { if (base_progress.do_output(message)) base_progress.output(message) _serial = _serial max message_serial } } else { _serial = _serial max Progress.private_data.read_messages_serial(db, _context) } val res = body if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial) res } } private def sync(): Unit = sync_database {} override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) } override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) } override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = base_progress.nodes_status(nodes_status) override def verbose: Boolean = base_progress.verbose override def stop(): Unit = sync_context { base_progress.stop(); sync() } override def stopped: Boolean = sync_context { base_progress.stopped } override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db } override def toString: String = super.toString + ": database " + db init() sync() } /* structured program progress */ object Program_Progress { class Program private[Program_Progress](heading: String, title: String) { private val output_buffer = new StringBuffer(256) // synchronized def output(message: Progress.Message): Unit = synchronized { if (output_buffer.length() > 0) output_buffer.append('\n') output_buffer.append(message.output_text) } val start_time: Time = Time.now() private var stop_time: Option[Time] = None def stop_now(): Unit = synchronized { stop_time = Some(Time.now()) } def output(): (Command.Results, XML.Body) = synchronized { val output_text = output_buffer.toString val elapsed_time = stop_time.map(t => t - start_time) val message_prefix = heading + " " val message_suffix = elapsed_time match { case None => " ..." case Some(t) => " ... (" + t.message + " elapsed time)" } val (results, message) = if (output_text.isEmpty) { (Command.Results.empty, XML.string(message_prefix + title + message_suffix)) } else { val i = Document_ID.make() val results = Command.Results.make(List(i -> Protocol.writeln_message(output_text))) val message = XML.string(message_prefix) ::: List(XML.Elem(Markup(Markup.WRITELN, Markup.Serial(i)), XML.string(title))) ::: XML.string(message_suffix) (results, message) } (results, List(XML.elem(Markup.TRACING_MESSAGE, message))) } } } abstract class Program_Progress( default_heading: String = "Running", default_title: String = "program", override val verbose: Boolean = false ) extends Progress { private var _finished_programs: List[Program_Progress.Program] = Nil private var _running_program: Option[Program_Progress.Program] = None def output(): (Command.Results, XML.Body) = synchronized { val programs = (_running_program.toList ::: _finished_programs).reverse val programs_output = programs.map(_.output()) val results = Command.Results.merge(programs_output.map(_._1)) val body = Library.separate(Pretty.Separator, programs_output.map(_._2)).flatten (results, body) } private def start_program(heading: String, title: String): Unit = synchronized { _running_program = Some(new Program_Progress.Program(heading, title)) } def stop_program(): Unit = synchronized { _running_program match { case Some(program) => program.stop_now() _finished_programs ::= program _running_program = None case None => } } def detect_program(s: String): Option[String] override def output(message: Progress.Message): Unit = synchronized { val writeln_msg = if (message.kind == Progress.Kind.writeln) message.text else "" detect_program(writeln_msg).map(Word.explode) match { case Some(a :: bs) => stop_program() start_program(a, Word.implode(bs)) case _ => if (_running_program.isEmpty) start_program(default_heading, default_title) if (do_output(message)) _running_program.get.output(message) } } } diff --git a/src/Pure/Thy/document_build.scala b/src/Pure/Thy/document_build.scala --- a/src/Pure/Thy/document_build.scala +++ b/src/Pure/Thy/document_build.scala @@ -1,617 +1,617 @@ /* Title: Pure/Thy/document_build.scala Author: Makarius Build theory document (PDF) from session database. */ package isabelle object Document_Build { /* document variants */ abstract class Document_Name { def name: String def path: Path = Path.basic(name) override def toString: String = name } object Document_Variant { def parse(opt: String): Document_Variant = space_explode('=', opt) match { case List(name) => Document_Variant(name, Latex.Tags.empty) case List(name, tags) => Document_Variant(name, Latex.Tags(tags)) case _ => error("Malformed document variant: " + quote(opt)) } } sealed case class Document_Variant(name: String, tags: Latex.Tags) extends Document_Name { def print: String = if (tags.toString.isEmpty) name else name + "=" + tags.toString } sealed case class Document_Input(name: String, sources: SHA1.Shasum) extends Document_Name { override def toString: String = name } sealed case class Document_Output(name: String, sources: SHA1.Shasum, log_xz: Bytes, pdf: Bytes) extends Document_Name { override def toString: String = name def log: String = log_xz.uncompress().text def log_lines: List[String] = split_lines(log) def write(db: SQL.Database, session_name: String): Unit = write_document(db, session_name, this) def write(dir: Path): Path = { val path = dir + Path.basic(name).pdf Isabelle_System.make_directory(path.expand.dir) Bytes.write(path, pdf) path } } /* SQL data model */ object private_data extends SQL.Data("isabelle_documents") { - override lazy val tables = SQL.Tables(Base.table) + override lazy val tables: SQL.Tables = SQL.Tables(Base.table) object Base { val session_name = SQL.Column.string("session_name").make_primary_key val name = SQL.Column.string("name").make_primary_key val sources = SQL.Column.string("sources") val log_xz = SQL.Column.bytes("log_xz") val pdf = SQL.Column.bytes("pdf") val table = make_table(List(session_name, name, sources, log_xz, pdf)) } def where_equal(session_name: String, name: String = ""): SQL.Source = SQL.where_and( Base.session_name.equal(session_name), if_proper(name, Base.name.equal(name))) def clean_session(db: SQL.Database, session_name: String): Unit = db.execute_statement(Base.table.delete(sql = Base.session_name.where_equal(session_name))) def read_document( db: SQL.Database, session_name: String, name: String ): Option[Document_Output] = { db.execute_query_statementO[Document_Output]( Base.table.select(sql = where_equal(session_name, name = name)), { res => val name = res.string(Base.name) val sources = res.string(Base.sources) val log_xz = res.bytes(Base.log_xz) val pdf = res.bytes(Base.pdf) Document_Output(name, SHA1.fake_shasum(sources), log_xz, pdf) } ) } def write_document(db: SQL.Database, session_name: String, doc: Document_Output): Unit = db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = session_name stmt.string(2) = doc.name stmt.string(3) = doc.sources.toString stmt.bytes(4) = doc.log_xz stmt.bytes(5) = doc.pdf }) } def clean_session(db: SQL.Database, session_name: String): Unit = private_data.transaction_lock(db, create = true, label = "Document_Build.clean_session") { private_data.clean_session(db, session_name) } def read_document(db: SQL.Database, session_name: String, name: String): Option[Document_Output] = private_data.transaction_lock(db, label = "Document_Build.read_document") { private_data.read_document(db, session_name, name) } def write_document(db: SQL.Database, session_name: String, doc: Document_Output): Unit = private_data.transaction_lock(db, label = "Document_Build.write_document") { private_data.write_document(db, session_name, doc) } /* background context */ def session_background( options: Options, session: String, dirs: List[Path] = Nil ): Sessions.Background = { Sessions.load_structure(options + "document", dirs = dirs). selection_deps(Sessions.Selection.session(session)).background(session) } /* document context */ val texinputs: Path = Path.explode("~~/lib/texinputs") val isabelle_styles: List[Path] = List("isabelle.sty", "isabellesym.sty", "pdfsetup.sty", "railsetup.sty"). map(name => texinputs + Path.basic(name)) def program_start(title: String): String = "PROGRAM START \"" + title + "\" ..." def program_running_script(title: String): String = "echo " + Bash.string(program_start("Running " + title)) + ";" def detect_program_start(s: String): Option[String] = for { s1 <- Library.try_unprefix("PROGRAM START \"", s) s2 <- Library.try_unsuffix("\" ...", s1) } yield s2 sealed case class Document_Latex( name: Document.Node.Name, body: XML.Body, line_pos: Properties.T => Option[Int] ) { def content: File.Content_XML = File.content(Path.basic(tex_name(name)), body) def file_pos: String = File.symbolic_path(name.path) def write(latex_output: Latex.Output, dir: Path): Unit = content.output(latex_output.make(_, file_pos = file_pos, line_pos = line_pos)) .write(dir) } def context( session_context: Export.Session_Context, document_session: Option[Sessions.Base] = None, document_selection: String => Boolean = _ => true, progress: Progress = new Progress ): Context = new Context(session_context, document_session, document_selection, progress) final class Context private[Document_Build]( val session_context: Export.Session_Context, document_session: Option[Sessions.Base], document_selection: String => Boolean, val progress: Progress ) { context => /* session info */ private val base = document_session getOrElse session_context.session_base private val info = session_context.sessions_structure(base.session_name) def session: String = info.name def options: Options = info.options override def toString: String = session val classpath: List[File.Content] = session_context.classpath() def document_bibliography: Boolean = options.bool("document_bibliography") def document_logo: Option[String] = options.string("document_logo") match { case "" => None case "_" => Some("") case name => Some(name) } def document_build: String = options.string("document_build") def get_engine(): Engine = { val name = document_build Classpath(jar_contents = classpath).make_services(classOf[Engine]) .find(_.name == name).getOrElse(error("Bad document_build engine " + quote(name))) } /* document content */ def documents: List[Document_Variant] = info.documents def session_document_theories: List[Document.Node.Name] = base.proper_session_theories def all_document_theories: List[Document.Node.Name] = base.all_document_theories lazy val isabelle_logo: Option[File.Content] = { document_logo.map(logo_name => Isabelle_System.with_tmp_file("logo", ext = "pdf") { tmp_path => Logo.create_logo(logo_name, output_file = tmp_path, quiet = true) val path = Path.basic("isabelle_logo.pdf") val content = Bytes.read(tmp_path) File.content(path, content) }) } lazy val session_graph: File.Content = { val path = Browser_Info.session_graph_path val content = graphview.Graph_File.make_pdf(options, base.session_graph_display) File.content(path, content) } lazy val session_tex: File.Content = { val path = Path.basic("session.tex") val content = terminate_lines( session_document_theories.map(name => "\\input{" + tex_name(name) + "}")) File.content(path, content) } lazy val document_latex: List[Document_Latex] = for (name <- all_document_theories) yield { val selected = document_selection(name.theory) val body = if (selected) { val entry = session_context(name.theory, Export.DOCUMENT_LATEX, permissive = true) YXML.parse_body(entry.text) } else { val text = proper_string(session_context.theory_source(name.theory)) getOrElse File.read(name.path) (for { outer <- Bibtex.Cite.parse(Bibtex.cite_commands(options), text) inner <- outer.parse } yield inner.nocite.latex_text).flatten } def line_pos(props: Properties.T): Option[Int] = Position.Line.unapply(props) orElse { if (selected) { for { snapshot <- session_context.document_snapshot id <- Position.Id.unapply(props) offset <- Position.Offset.unapply(props) line <- snapshot.find_command_line(id, offset) } yield line } else None } Document_Latex(name, body, line_pos) } /* document directory */ def make_directory(dir: Path, doc: Document_Variant): Path = Isabelle_System.make_directory(dir + Path.basic(doc.name)) def prepare_directory( dir: Path, doc: Document_Variant, latex_output: Latex.Output, verbose: Boolean ): Directory = { val doc_dir = make_directory(dir, doc) if (verbose) progress.echo(program_start("Creating directory")) /* actual sources: with SHA1 digest */ isabelle_styles.foreach(Latex.copy_file(_, doc_dir)) val comment_latex = latex_output.options.bool("document_comment_latex") if (!comment_latex) Latex.copy_file(texinputs + Path.basic("comment.sty"), doc_dir) doc.tags.sty(comment_latex).write(doc_dir) for ((base_dir, src) <- info.document_files) { Latex.copy_file_base(info.dir + base_dir, src, doc_dir) } session_tex.write(doc_dir) document_latex.foreach(_.write(latex_output, doc_dir)) val root_name1 = "root_" + doc.name val root_name = if ((doc_dir + Path.explode(root_name1).tex).is_file) root_name1 else "root" val document_prefs = latex_output.options.make_prefs(filter = _.for_document) val meta_info = SHA1.shasum_meta_info( SHA1.digest( List(doc.print, document_logo.toString, document_build, document_prefs).toString)) val manifest = SHA1.shasum_sorted( for (file <- File.find_files(doc_dir.file, follow_links = true)) yield SHA1.digest(file) -> File.path(doc_dir.java_path.relativize(file.toPath)).implode) val sources = meta_info ::: manifest /* derived material: without SHA1 digest */ isabelle_logo.foreach(_.write(doc_dir)) session_graph.write(doc_dir) if (verbose) { progress.bash("ls -alR", echo = true, cwd = doc_dir.file).check progress match { case program_progress: Program_Progress => program_progress.stop_program() case _ => } } Directory(doc_dir, doc, root_name, sources) } def old_document(directory: Directory): Option[Document_Output] = for { db <- session_context.session_db() old_doc <- read_document(db, session, directory.doc.name) if old_doc.sources == directory.sources } yield old_doc } sealed case class Directory( doc_dir: Path, doc: Document_Variant, root_name: String, sources: SHA1.Shasum ) { def root_name_script(ext: String = ""): String = Bash.string(if (ext.isEmpty) root_name else root_name + "." + ext) def conditional_script( ext: String, exe: String, title: String = "", after: String = "" ): String = { "if [ -f " + root_name_script(ext) + " ]\n" + "then\n" + " " + (if (title.nonEmpty) program_running_script(title) else "") + exe + " " + root_name_script() + "\n" + if_proper(after, " " + after) + "fi\n" } def log_errors(): List[String] = Latex.latex_errors(doc_dir, root_name) ::: Bibtex.bibtex_errors(doc_dir, root_name) def make_document(log: List[String], errors: List[String]): Document_Output = { val root_pdf = Path.basic(root_name).pdf val result_pdf = doc_dir + root_pdf if (errors.nonEmpty) { val message = "Failed to build document " + quote(doc.name) throw new Build_Error(log, errors ::: List(message)) } else if (!result_pdf.is_file) { val message = "Bad document result: expected to find " + root_pdf throw new Build_Error(log, List(message)) } else { val log_xz = Bytes(cat_lines(log)).compress() val pdf = Bytes.read(result_pdf) Document_Output(doc.name, sources, log_xz, pdf) } } } /* build engines */ abstract class Engine(val name: String) extends Isabelle_System.Service { override def toString: String = name def prepare_directory(context: Context, dir: Path, doc: Document_Variant, verbose: Boolean): Directory def build_document(context: Context, directory: Directory, verbose: Boolean): Document_Output } abstract class Bash_Engine(name: String) extends Engine(name) { def prepare_directory(context: Context, dir: Path, doc: Document_Variant, verbose: Boolean): Directory = context.prepare_directory(dir, doc, new Latex.Output(context.options), verbose) def use_pdflatex: Boolean = false def running_latex: String = program_running_script(if (use_pdflatex) "pdflatex" else "lualatex") def latex_script(context: Context, directory: Directory): String = running_latex + (if (use_pdflatex) "$ISABELLE_PDFLATEX" else "$ISABELLE_LUALATEX") + " " + directory.root_name_script() + "\n" def bibtex_script(context: Context, directory: Directory, latex: Boolean = false): String = { val ext = if (context.document_bibliography) "aux" else "bib" directory.conditional_script(ext, "$ISABELLE_BIBTEX", title = "bibtex", after = if (latex) latex_script(context, directory) else "") } def makeindex_script(context: Context, directory: Directory, latex: Boolean = false): String = directory.conditional_script("idx", "$ISABELLE_MAKEINDEX", title = "makeindex", after = if (latex) latex_script(context, directory) else "") def use_build_script: Boolean = false def build_script(context: Context, directory: Directory): String = { val has_build_script = (directory.doc_dir + Path.explode("build")).is_file if (!use_build_script && has_build_script) { error("Unexpected document build script for option document_build=" + quote(context.document_build)) } else if (use_build_script && !has_build_script) error("Missing document build script") else if (has_build_script) "./build pdf " + Bash.string(directory.doc.name) else { "set -e\n" + latex_script(context, directory) + bibtex_script(context, directory, latex = true) + makeindex_script(context, directory) + latex_script(context, directory) + makeindex_script(context, directory, latex = true) } } def build_document( context: Context, directory: Directory, verbose: Boolean ): Document_Output = { val progress = context.progress val result = progress.bash( build_script(context, directory), cwd = directory.doc_dir.file, echo = verbose, watchdog = Time.seconds(0.5)) val log = result.out_lines ::: result.err_lines val err = result.err val errors1 = directory.log_errors() val errors2 = if (result.ok) errors1 else if (err.nonEmpty) err :: errors1 else if (errors1.nonEmpty) errors1 else List("Error") directory.make_document(log, errors2) } } class LuaLaTeX_Engine extends Bash_Engine("lualatex") class PDFLaTeX_Engine extends Bash_Engine("pdflatex") { override def use_pdflatex: Boolean = true } class Build_Engine extends Bash_Engine("build") { override def use_build_script: Boolean = true } class LIPIcs_Engine(name: String) extends Bash_Engine(name) { def lipics_options(options: Options): Options = options + "document_heading_prefix=" + "document_comment_latex" override def use_pdflatex: Boolean = true override def prepare_directory( context: Context, dir: Path, doc: Document_Variant, verbose: Boolean ): Directory = { val doc_dir = context.make_directory(dir, doc) Component_LIPIcs.document_files.foreach(Latex.copy_file(_, doc_dir)) val latex_output = new Latex.Output(lipics_options(context.options)) context.prepare_directory(dir, doc, latex_output, verbose) } } class LIPIcs_LuaLaTeX_Engine extends LIPIcs_Engine("lipics") class LIPIcs_PDFLaTeX_Engine extends LIPIcs_Engine("lipics_pdflatex") { override def use_pdflatex: Boolean = true } /* build documents */ def tex_name(name: Document.Node.Name): String = name.theory_base_name + ".tex" class Build_Error(val log_lines: List[String], val log_errors: List[String]) extends Exn.User_Error(Exn.cat_message(log_errors: _*)) def build_documents( context: Context, output_sources: Option[Path] = None, output_pdf: Option[Path] = None, verbose: Boolean = false ): List[Document_Output] = { val progress = context.progress val engine = context.get_engine() val documents = for (doc <- context.documents) yield { Isabelle_System.with_tmp_dir("document") { tmp_dir => progress.echo("Preparing " + context.session + "/" + doc.name + " ...") val start = Time.now() output_sources.foreach(engine.prepare_directory(context, _, doc, false)) val directory = engine.prepare_directory(context, tmp_dir, doc, verbose) val document = context.old_document(directory) getOrElse engine.build_document(context, directory, verbose) val stop = Time.now() val timing = stop - start progress.echo("Finished " + context.session + "/" + doc.name + " (" + timing.message_hms + " elapsed time)") document } } for (dir <- output_pdf; doc <- documents) { val path = doc.write(dir) progress.echo("Document at " + path.absolute) } documents } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("document", "prepare session theory document", Scala_Project.here, { args => var output_sources: Option[Path] = None var output_pdf: Option[Path] = None var verbose_latex = false var dirs: List[Path] = Nil var options = Options.init() var verbose_build = false val getopts = Getopts(""" Usage: isabelle document [OPTIONS] SESSION Options are: -O DIR output directory for LaTeX sources and resulting PDF -P DIR output directory for resulting PDF -S DIR output directory for LaTeX sources -V verbose latex -d DIR include session directory -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose build Prepare the theory document of a session. """, "O:" -> (arg => { val dir = Path.explode(arg) output_sources = Some(dir) output_pdf = Some(dir) }), "P:" -> (arg => { output_pdf = Some(Path.explode(arg)) }), "S:" -> (arg => { output_sources = Some(Path.explode(arg)) }), "V" -> (_ => verbose_latex = true), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose_build = true)) val more_args = getopts(args) val session = more_args match { case List(a) => a case _ => getopts.usage() } val progress = new Console_Progress(verbose = verbose_build) progress.interrupt_handler { val build_results = Build.build(options, selection = Sessions.Selection.session(session), dirs = dirs, progress = progress) if (!build_results.ok) error("Failed to build session " + quote(session)) if (output_sources.isEmpty && output_pdf.isEmpty) { progress.echo_warning("No output directory") } val session_background = Document_Build.session_background(options, session, dirs = dirs) using(Export.open_session_context(build_results.store, session_background)) { session_context => build_documents( context(session_context, progress = progress), output_sources = output_sources, output_pdf = output_pdf, verbose = verbose_latex) } } }) } diff --git a/src/Pure/Tools/server.scala b/src/Pure/Tools/server.scala --- a/src/Pure/Tools/server.scala +++ b/src/Pure/Tools/server.scala @@ -1,585 +1,585 @@ /* Title: Pure/Tools/server.scala Author: Makarius Resident Isabelle servers. Message formats: - short message (single line): NAME ARGUMENT - long message (multiple lines): BYTE_LENGTH NAME ARGUMENT Argument formats: - Unit as empty string - XML.Elem in YXML notation - JSON.T in standard notation */ package isabelle import java.io.{BufferedInputStream, BufferedOutputStream, InputStreamReader, OutputStreamWriter, IOException} import java.net.{Socket, SocketException, SocketTimeoutException, ServerSocket, InetAddress} object Server { /* message argument */ object Argument { def is_name_char(c: Char): Boolean = Symbol.is_ascii_letter(c) || Symbol.is_ascii_digit(c) || c == '_' || c == '.' def split(msg: String): (String, String) = { val name = msg.takeWhile(is_name_char) val argument = msg.substring(name.length).dropWhile(Symbol.is_ascii_blank) (name, argument) } def print(arg: Any): String = arg match { case () => "" case t: XML.Elem => YXML.string_of_tree(t) case t: JSON.T => JSON.Format(t) } def parse(argument: String): Any = if (argument == "") () else if (YXML.detect_elem(argument)) YXML.parse_elem(argument) else JSON.parse(argument, strict = false) def unapply(argument: String): Option[Any] = try { Some(parse(argument)) } catch { case ERROR(_) => None } } /* input command */ type Command_Body = PartialFunction[(Context, Any), Any] abstract class Command(val command_name: String) { def command_body: Command_Body override def toString: String = command_name } class Commands(commands: Command*) extends Isabelle_System.Service { def entries: List[Command] = commands.toList } private lazy val command_table: Map[String, Command] = Isabelle_System.make_services(classOf[Commands]).flatMap(_.entries). foldLeft(Map.empty[String, Command]) { case (cmds, cmd) => val name = cmd.command_name if (cmds.isDefinedAt(name)) error("Duplicate Isabelle server command: " + quote(name)) else cmds + (name -> cmd) } /* output reply */ class Error(val message: String, val json: JSON.Object.T = JSON.Object.empty) extends RuntimeException(message) def json_error(exn: Throwable): JSON.Object.T = exn match { case e: Error => Reply.error_message(e.message) ++ e.json case ERROR(msg) => Reply.error_message(msg) case _ if Exn.is_interrupt(exn) => Reply.error_message(Exn.message(exn)) case _ => JSON.Object.empty } object Reply { def message(msg: String, kind: String = ""): JSON.Object.T = JSON.Object(Markup.KIND -> proper_string(kind).getOrElse(Markup.WRITELN), "message" -> msg) def error_message(msg: String): JSON.Object.T = message(msg, kind = Markup.ERROR) def unapply(msg: String): Option[(Reply, Any)] = { if (msg == "") None else { val (name, argument) = Argument.split(msg) for { reply <- try { Some(Reply.valueOf(name)) } catch { case _: IllegalArgumentException => None } arg <- Argument.unapply(argument) } yield (reply, arg) } } } enum Reply { case OK, ERROR, FINISHED, FAILED, NOTE } /* handler: port, password, thread */ abstract class Handler(port0: Int) { val socket: ServerSocket = new ServerSocket(port0, 50, Server.localhost) def port: Int = socket.getLocalPort def address: String = print_address(port) val password: String = UUID.random_string() override def toString: String = print(port, password) def handle(connection: Server.Connection): Unit private lazy val thread: Thread = Isabelle_Thread.fork(name = "server_handler") { var finished = false while (!finished) { Exn.capture(socket.accept) match { case Exn.Res(client) => Isabelle_Thread.fork(name = "client") { using(Connection(client))(connection => if (connection.read_password(password)) handle(connection)) } case Exn.Exn(_) => finished = true } } } def start(): Unit = thread def join(): Unit = thread.join() def stop(): Unit = { socket.close(); join() } } /* socket connection */ object Connection { def apply(socket: Socket): Connection = new Connection(socket) } class Connection private(socket: Socket) extends AutoCloseable { override def toString: String = socket.toString def close(): Unit = socket.close() def set_timeout(t: Time): Unit = socket.setSoTimeout(t.ms.toInt) private val in = new BufferedInputStream(socket.getInputStream) private val out = new BufferedOutputStream(socket.getOutputStream) private val out_lock: AnyRef = new Object def tty_loop(): TTY_Loop = new TTY_Loop( new OutputStreamWriter(out), new InputStreamReader(in), writer_lock = out_lock) def read_password(password: String): Boolean = try { Byte_Message.read_line(in).map(_.text).contains(password) } catch { case _: IOException => false } def read_line_message(): Option[String] = try { Byte_Message.read_line_message(in).map(_.text) } catch { case _: IOException => None } def read_byte_message(): Option[List[Bytes]] = try { Byte_Message.read_message(in) } catch { case _: IOException => None } def await_close(): Unit = try { Byte_Message.read(in, 1); socket.close() } catch { case _: IOException => } def write_line_message(msg: String): Unit = out_lock.synchronized { Byte_Message.write_line_message(out, Bytes(UTF8.bytes(msg))) } def write_byte_message(chunks: List[Bytes]): Unit = out_lock.synchronized { Byte_Message.write_message(out, chunks) } def reply(r: Reply, arg: Any): Unit = { val argument = Argument.print(arg) write_line_message(if (argument == "") r.toString else r.toString + " " + argument) } def reply_ok(arg: Any): Unit = reply(Reply.OK, arg) def reply_error(arg: Any): Unit = reply(Reply.ERROR, arg) def reply_error_message(message: String, more: JSON.Object.Entry*): Unit = reply_error(Reply.error_message(message) ++ more) def notify(arg: Any): Unit = reply(Reply.NOTE, arg) } /* context with output channels */ class Context private[Server](val server: Server, connection: Connection) extends AutoCloseable { context => def command_list: List[String] = command_table.keys.toList.sorted def reply(r: Reply, arg: Any): Unit = connection.reply(r, arg) def notify(arg: Any): Unit = connection.notify(arg) def message(kind: String, msg: String, more: JSON.Object.Entry*): Unit = notify(Reply.message(msg, kind = kind) ++ more) def writeln(msg: String, more: JSON.Object.Entry*): Unit = message(Markup.WRITELN, msg, more:_*) def warning(msg: String, more: JSON.Object.Entry*): Unit = message(Markup.WARNING, msg, more:_*) def error_message(msg: String, more: JSON.Object.Entry*): Unit = message(Markup.ERROR, msg, more:_*) def progress(more: JSON.Object.Entry*): Connection_Progress = new Connection_Progress(context, more:_*) override def toString: String = connection.toString /* asynchronous tasks */ private val _tasks = Synchronized(Set.empty[Task]) def make_task(body: Task => JSON.Object.T): Task = { val task = new Task(context, body) _tasks.change(_ + task) task } def remove_task(task: Task): Unit = _tasks.change(_ - task) def cancel_task(id: UUID.T): Unit = _tasks.change(tasks => { tasks.find(task => task.id == id).foreach(_.cancel()); tasks }) def close(): Unit = { while(_tasks.change_result(tasks => { tasks.foreach(_.cancel()); (tasks.nonEmpty, tasks) })) { _tasks.value.foreach(_.join()) } } } class Connection_Progress private[Server](context: Context, more: JSON.Object.Entry*) extends Progress { override def verbose: Boolean = true override def output(message: Progress.Message): Unit = { val more1 = ("verbose" -> message.verbose.toString) :: more.toList message.kind match { case Progress.Kind.writeln => context.writeln(message.text, more1:_*) case Progress.Kind.warning => context.warning(message.text, more1:_*) case Progress.Kind.error_message => context.error_message(message.text, more1:_*) } } override def theory(theory: Progress.Theory): Unit = { val entries: List[JSON.Object.Entry] = List("theory" -> theory.theory, "session" -> theory.session) ::: (theory.percentage match { case None => Nil case Some(p) => List("percentage" -> p) }) context.writeln(theory.message.text, entries ::: more.toList:_*) } override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = { val json = for ((name, node_status) <- nodes_status.present() if !node_status.is_empty) yield name.json + ("status" -> node_status.json) context.notify(JSON.Object(Markup.KIND -> Markup.NODES_STATUS, Markup.NODES_STATUS -> json)) } override def toString: String = super.toString + ": " + context.toString } class Task private[Server](val context: Context, body: Task => JSON.Object.T) { task => val id: UUID.T = UUID.random() val ident: JSON.Object.Entry = ("task" -> id.toString) val progress: Connection_Progress = context.progress(ident) def cancel(): Unit = progress.stop() private lazy val thread = Isabelle_Thread.fork(name = "server_task") { Exn.capture { body(task) } match { case Exn.Res(res) => context.reply(Reply.FINISHED, res + ident) case Exn.Exn(exn) => val err = json_error(exn) if (err.isEmpty) throw exn else context.reply(Reply.FAILED, err + ident) } progress.stop() context.remove_task(task) } def start(): Unit = thread def join(): Unit = thread.join() } /* server info */ val localhost_name: String = "127.0.0.1" def localhost: InetAddress = InetAddress.getByName(localhost_name) def print_address(port: Int): String = localhost_name + ":" + port def print(port: Int, password: String): String = print_address(port) + " (password " + quote(password) + ")" object Info { private val Pattern = ("""server "([^"]*)" = \Q""" + localhost_name + """\E:(\d+) \(password "([^"]*)"\)""").r def parse(s: String): Option[Info] = s match { case Pattern(name, Value.Int(port), password) => Some(Info(name, port, password)) case _ => None } def apply(name: String, port: Int, password: String): Info = new Info(name, port, password) } class Info private(val name: String, val port: Int, val password: String) { def address: String = print_address(port) override def toString: String = "server " + quote(name) + " = " + print(port, password) def connection(): Connection = { val connection = Connection(new Socket(localhost, port)) connection.write_line_message(password) connection } def active: Boolean = try { using(connection()) { connection => connection.set_timeout(Time.seconds(2.0)) connection.read_line_message() match { case Some(Reply(Reply.OK, _)) => true case _ => false } } } catch { case _: IOException => false } } /* per-user servers */ val default_name = "isabelle" object private_data extends SQL.Data() { val database = Path.explode("$ISABELLE_HOME_USER/servers.db") - override lazy val tables = SQL.Tables(Base.table) + override lazy val tables: SQL.Tables = SQL.Tables(Base.table) object Base { val name = SQL.Column.string("name").make_primary_key val port = SQL.Column.int("port") val password = SQL.Column.string("password") val table = SQL.Table("isabelle_servers", List(name, port, password)) } def list(db: SQLite.Database): List[Info] = if (db.exists_table(Base.table)) { db.execute_query_statement(Base.table.select(), List.from[Info], { res => val name = res.string(Base.name) val port = res.int(Base.port) val password = res.string(Base.password) Info(name, port, password) } ).sortBy(_.name) } else Nil def find(db: SQLite.Database, name: String): Option[Info] = list(db).find(server_info => server_info.name == name && server_info.active) } def init( name: String = default_name, port: Int = 0, existing_server: Boolean = false, log: Logger = new Logger ): (Info, Option[Server]) = { using(SQLite.open_database(private_data.database, restrict = true)) { db => private_data.transaction_lock(db, create = true) { private_data.list(db).filterNot(_.active).foreach(server_info => db.execute_statement( private_data.Base.table.delete(sql = private_data.Base.name.where_equal(server_info.name)))) } private_data.transaction_lock(db) { private_data.find(db, name) match { case Some(server_info) => (server_info, None) case None => if (existing_server) error("Isabelle server " + quote(name) + " not running") val server = new Server(port, log) val server_info = Info(name, server.port, server.password) db.execute_statement( private_data.Base.table.delete(sql = private_data.Base.name.where_equal(name))) db.execute_statement(private_data.Base.table.insert(), body = { stmt => stmt.string(1) = server_info.name stmt.int(2) = server_info.port stmt.string(3) = server_info.password }) server.start() (server_info, Some(server)) } } } } def exit(name: String = default_name): Boolean = { using(SQLite.open_database(private_data.database)) { db => private_data.transaction_lock(db) { private_data.find(db, name) match { case Some(server_info) => using(server_info.connection())(_.write_line_message("shutdown")) while(server_info.active) { Time.seconds(0.05).sleep() } true case None => false } } } } /* Isabelle tool wrapper */ val isabelle_tool = Isabelle_Tool("server", "manage resident Isabelle servers", Scala_Project.here, { args => var console = false var log_file: Option[Path] = None var operation_list = false var operation_exit = false var name = default_name var port = 0 var existing_server = false val getopts = Getopts(""" Usage: isabelle server [OPTIONS] Options are: -L FILE logging on FILE -c console interaction with specified server -l list servers (alternative operation) -n NAME explicit server name (default: """ + default_name + """) -p PORT explicit server port -s assume existing server, no implicit startup -x exit specified server (alternative operation) Manage resident Isabelle servers. """, "L:" -> (arg => log_file = Some(Path.explode(File.standard_path(arg)))), "c" -> (_ => console = true), "l" -> (_ => operation_list = true), "n:" -> (arg => name = arg), "p:" -> (arg => port = Value.Int.parse(arg)), "s" -> (_ => existing_server = true), "x" -> (_ => operation_exit = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() if (operation_list) { for { server_info <- using(SQLite.open_database(private_data.database))(private_data.list) if server_info.active } Output.writeln(server_info.toString, stdout = true) } else if (operation_exit) { val ok = Server.exit(name) sys.exit(if (ok) Process_Result.RC.ok else Process_Result.RC.failure) } else { val log = Logger.make_file(log_file) val (server_info, server) = init(name, port = port, existing_server = existing_server, log = log) Output.writeln(server_info.toString, stdout = true) if (console) { using(server_info.connection())(connection => connection.tty_loop().join()) } server.foreach(_.join()) } }) } class Server private(port0: Int, val log: Logger) extends Server.Handler(port0) { server => private val _sessions = Synchronized(Map.empty[UUID.T, Headless.Session]) def err_session(id: UUID.T): Nothing = error("No session " + Library.single_quote(id.toString)) def the_session(id: UUID.T): Headless.Session = _sessions.value.getOrElse(id, err_session(id)) def add_session(entry: (UUID.T, Headless.Session)): Unit = _sessions.change(_ + entry) def remove_session(id: UUID.T): Headless.Session = { _sessions.change_result(sessions => sessions.get(id) match { case Some(session) => (session, sessions - id) case None => err_session(id) }) } def shutdown(): Unit = { server.socket.close() val sessions = _sessions.change_result(sessions => (sessions, Map.empty)) for ((_, session) <- sessions) { try { val result = session.stop() if (!result.ok) log("Session shutdown failed: " + result.print_rc) } catch { case ERROR(msg) => log("Session shutdown failed: " + msg) } } } override def join(): Unit = { super.join(); shutdown() } override def handle(connection: Server.Connection): Unit = { using(new Server.Context(server, connection)) { context => connection.reply_ok( JSON.Object( "isabelle_id" -> Isabelle_System.isabelle_id(), "isabelle_name" -> Isabelle_System.isabelle_name())) var finished = false while (!finished) { connection.read_line_message() match { case None => finished = true case Some("") => context.notify("Command 'help' provides list of commands") case Some(msg) => val (name, argument) = Server.Argument.split(msg) Server.command_table.get(name) match { case Some(cmd) => argument match { case Server.Argument(arg) => if (cmd.command_body.isDefinedAt((context, arg))) { Exn.capture { cmd.command_body((context, arg)) } match { case Exn.Res(task: Server.Task) => connection.reply_ok(JSON.Object(task.ident)) task.start() case Exn.Res(res) => connection.reply_ok(res) case Exn.Exn(exn) => val err = Server.json_error(exn) if (err.isEmpty) throw exn else connection.reply_error(err) } } else { connection.reply_error_message( "Bad argument for command " + Library.single_quote(name), "argument" -> argument) } case _ => connection.reply_error_message( "Malformed argument for command " + Library.single_quote(name), "argument" -> argument) } case None => connection.reply_error("Bad command " + Library.single_quote(name)) } } } } } } diff --git a/src/Pure/library.scala b/src/Pure/library.scala --- a/src/Pure/library.scala +++ b/src/Pure/library.scala @@ -1,326 +1,351 @@ /* Title: Pure/library.scala Author: Makarius Basic library. */ package isabelle import scala.annotation.tailrec import scala.collection.mutable import scala.util.matching.Regex object Library { /* resource management */ def using[A <: AutoCloseable, B](a: A)(f: A => B): B = { try { f(a) } finally { if (a != null) a.close() } } def using_option[A <: AutoCloseable, B](opt: Option[A])(f: A => B): Option[B] = opt.map(a => using(a)(f)) def using_optional[A <: AutoCloseable, B](opt: Option[A])(f: Option[A] => B): B = { try { f(opt) } finally { opt match { case Some(a) if a != null => a.close() case _ => } } } /* integers */ private val small_int = 10000 private lazy val small_int_table = { val array = new Array[String](small_int) for (i <- 0 until small_int) array(i) = i.toString array } def is_small_int(s: String): Boolean = { val len = s.length 1 <= len && len <= 4 && s.forall(c => '0' <= c && c <= '9') && (len == 1 || s(0) != '0') } def signed_string_of_long(i: Long): String = if (0 <= i && i < small_int) small_int_table(i.toInt) else i.toString def signed_string_of_int(i: Int): String = if (0 <= i && i < small_int) small_int_table(i) else i.toString /* separated chunks */ def separate[A](s: A, list: List[A]): List[A] = { val result = new mutable.ListBuffer[A] var first = true for (x <- list) { if (first) { first = false result += x } else { result += s result += x } } result.toList } def separated_chunks(sep: Char => Boolean, source: CharSequence): Iterator[CharSequence] = new Iterator[CharSequence] { private val end = source.length private def next_chunk(i: Int): Option[(CharSequence, Int)] = { if (i < end) { var j = i while ({ j += 1 j < end && !sep(source.charAt(j)) }) () Some((source.subSequence(i + 1, j), j)) } else None } private var state: Option[(CharSequence, Int)] = if (end == 0) None else next_chunk(-1) def hasNext: Boolean = state.isDefined def next(): CharSequence = state match { case Some((s, i)) => state = next_chunk(i); s case None => Iterator.empty.next() } } def space_explode(sep: Char, str: String): List[String] = separated_chunks(_ == sep, str).map(_.toString).toList /* lines */ def count_newlines(str: String): Int = str.count(_ == '\n') def terminate_lines(lines: IterableOnce[String]): String = { val it = lines.iterator if (it.isEmpty) "" else it.mkString("", "\n", "\n") } def cat_lines(lines: IterableOnce[String]): String = lines.iterator.mkString("\n") def split_lines(str: String): List[String] = space_explode('\n', str) def prefix_lines(prfx: String, str: String): String = isabelle.setup.Library.prefix_lines(prfx, str) def indent_lines(n: Int, str: String): String = prefix_lines(Symbol.spaces(n), str) def first_line(source: CharSequence): String = { val lines = separated_chunks(_ == '\n', source) if (lines.hasNext) lines.next().toString else "" } def trim_line(s: String): String = isabelle.setup.Library.trim_line(s) def trim_split_lines(s: String): List[String] = split_lines(trim_line(s)).map(trim_line) def encode_lines(s: String): String = s.replace('\n', '\u000b') def decode_lines(s: String): String = s.replace('\u000b', '\n') /* strings */ def make_string(f: StringBuilder => Unit, capacity: Int = 16): String = { val s = new StringBuilder(capacity) f(s) s.toString } def try_unprefix(prfx: String, s: String): Option[String] = if (s.startsWith(prfx)) Some(s.substring(prfx.length)) else None def try_unsuffix(sffx: String, s: String): Option[String] = if (s.endsWith(sffx)) Some(s.substring(0, s.length - sffx.length)) else None def perhaps_unprefix(prfx: String, s: String): String = try_unprefix(prfx, s) getOrElse s def perhaps_unsuffix(sffx: String, s: String): String = try_unsuffix(sffx, s) getOrElse s def isolate_substring(s: String): String = new String(s.toCharArray) def strip_ansi_color(s: String): String = s.replaceAll("\u001b\\[\\d+m", "") /* quote */ def single_quote(s: String): String = "'" + s + "'" def quote(s: String): String = "\"" + s + "\"" def try_unquote(s: String): Option[String] = if (s.startsWith("\"") && s.endsWith("\"")) Some(s.substring(1, s.length - 1)) else None def perhaps_unquote(s: String): String = try_unquote(s) getOrElse s def commas(ss: Iterable[String]): String = ss.iterator.mkString(", ") def commas_quote(ss: Iterable[String]): String = ss.iterator.map(quote).mkString(", ") /* CharSequence */ class Reverse(text: CharSequence, start: Int, end: Int) extends CharSequence { require(0 <= start && start <= end && end <= text.length, "bad reverse range") def this(text: CharSequence) = this(text, 0, text.length) def length: Int = end - start def charAt(i: Int): Char = text.charAt(end - i - 1) def subSequence(i: Int, j: Int): CharSequence = if (0 <= i && i <= j && j <= length) new Reverse(text, end - j, end - i) else throw new IndexOutOfBoundsException override def toString: String = { val buf = new StringBuilder(length) for (i <- 0 until length) buf.append(charAt(i)) buf.toString } } class Line_Termination(text: CharSequence) extends CharSequence { def length: Int = text.length + 1 def charAt(i: Int): Char = if (i == text.length) '\n' else text.charAt(i) def subSequence(i: Int, j: Int): CharSequence = if (j == text.length + 1) new Line_Termination(text.subSequence(i, j - 1)) else text.subSequence(i, j) override def toString: String = text.toString + "\n" } /* regular expressions */ def make_regex(s: String): Option[Regex] = try { Some(new Regex(s)) } catch { case ERROR(_) => None } def is_regex_meta(c: Char): Boolean = """()[]{}\^$|?*+.<>-=!""".contains(c) def escape_regex(s: String): String = if (s.exists(is_regex_meta)) { (for (c <- s.iterator) yield { if (is_regex_meta(c)) "\\" + c.toString else c.toString }).mkString } else s /* lists */ def take_prefix[A](pred: A => Boolean, xs: List[A]): (List[A], List[A]) = (xs.takeWhile(pred), xs.dropWhile(pred)) def take_suffix[A](pred: A => Boolean, xs: List[A]): (List[A], List[A]) = { val rev_xs = xs.reverse (rev_xs.dropWhile(pred).reverse, rev_xs.takeWhile(pred).reverse) } def trim[A](pred: A => Boolean, xs: List[A]): List[A] = take_suffix(pred, take_prefix(pred, xs)._2)._1 def member[A, B](xs: List[A])(x: B): Boolean = xs.contains(x) def insert[A](x: A)(xs: List[A]): List[A] = if (xs.contains(x)) xs else x :: xs def remove[A, B](x: B)(xs: List[A]): List[A] = if (member(xs)(x)) xs.filterNot(_ == x) else xs def update[A](x: A)(xs: List[A]): List[A] = x :: remove(x)(xs) def merge[A](xs: List[A], ys: List[A]): List[A] = if (xs.eq(ys)) xs else if (xs.isEmpty) ys else ys.foldRight(xs)(Library.insert(_)(_)) def distinct[A](xs: List[A], eq: (A, A) => Boolean = (x: A, y: A) => x == y): List[A] = { val result = new mutable.ListBuffer[A] xs.foreach(x => if (!result.exists(y => eq(x, y))) result += x) result.toList } def duplicates[A](lst: List[A], eq: (A, A) => Boolean = (x: A, y: A) => x == y): List[A] = { val result = new mutable.ListBuffer[A] @tailrec def dups(rest: List[A]): Unit = rest match { case Nil => case x :: xs => if (!result.exists(y => eq(x, y)) && xs.exists(y => eq(x, y))) result += x dups(xs) } dups(lst) result.toList } def replicate[A](n: Int, a: A): List[A] = if (n < 0) throw new IllegalArgumentException else if (n == 0) Nil else { val res = new mutable.ListBuffer[A] (1 to n).foreach(_ => res += a) res.toList } def the_single[A](xs: List[A], message: => String = "Single argument expected"): A = xs match { case List(x) => x case _ => error(message) } - def symmetric_difference[A](xs: List[A], ys: List[A]): (List[A], List[A]) = - (xs.filterNot(ys.toSet), ys.filterNot(xs.toSet)) + + /* data update */ + + object Update { + type Data[A] = Map[String, A] + + val empty: Update = Update() + + def make[A](a: Data[A], b: Data[A], kind: Int = 0): Update = + if (a eq b) empty + else { + val delete = List.from(for ((x, y) <- a.iterator if !b.get(x).contains(y)) yield x) + val insert = List.from(for ((x, y) <- b.iterator if !a.get(x).contains(y)) yield x) + Update(delete = delete, insert = insert, kind = kind) + } + } + + sealed case class Update( + delete: List[String] = Nil, + insert: List[String] = Nil, + kind: Int = 0 + ) { + def deletes: Boolean = delete.nonEmpty + def inserts: Boolean = insert.nonEmpty + def defined: Boolean = deletes || inserts + lazy val domain: Set[String] = delete.toSet ++ insert + } /* proper values */ def proper_bool(b: Boolean): Option[Boolean] = if (!b) None else Some(b) def proper_string(s: String): Option[String] = if (s == null || s == "") None else Some(s) def proper_list[A](list: List[A]): Option[List[A]] = if (list == null || list.isEmpty) None else Some(list) def if_proper[A](x: Iterable[A], body: => String): String = if (x == null || x.isEmpty) "" else body def if_proper(b: Boolean, body: => String): String = if (!b) "" else body /* reflection */ def is_subclass[A, B](a: Class[A], b: Class[B]): Boolean = { import scala.language.existentials @tailrec def subclass(c: Class[_]): Boolean = { c == b || { val d = c.getSuperclass; d != null && subclass(d) } } subclass(a) } def as_subclass[C](c: Class[C])(x: AnyRef): Option[C] = if (x == null || is_subclass(x.getClass, c)) Some(x.asInstanceOf[C]) else None /* named items */ trait Named { def name: String } }