diff --git a/src/Pure/Admin/build_log.scala b/src/Pure/Admin/build_log.scala --- a/src/Pure/Admin/build_log.scala +++ b/src/Pure/Admin/build_log.scala @@ -1,1117 +1,1117 @@ /* Title: Pure/Admin/build_log.scala Author: Makarius Management of build log files and database storage. */ package isabelle import java.io.{File => JFile} import java.time.format.{DateTimeFormatter, DateTimeParseException} import java.util.Locale import scala.collection.immutable.SortedMap import scala.collection.mutable import scala.util.matching.Regex object Build_Log { /** content **/ /* properties */ object Prop { val build_tags = SQL.Column.string("build_tags") // lines val build_args = SQL.Column.string("build_args") // lines val build_group_id = SQL.Column.string("build_group_id") val build_id = SQL.Column.string("build_id") val build_engine = SQL.Column.string("build_engine") val build_host = SQL.Column.string("build_host") val build_start = SQL.Column.date("build_start") val build_end = SQL.Column.date("build_end") val isabelle_version = SQL.Column.string("isabelle_version") val afp_version = SQL.Column.string("afp_version") val all_props: List[SQL.Column] = List(build_tags, build_args, build_group_id, build_id, build_engine, build_host, build_start, build_end, isabelle_version, afp_version) } /* settings */ object Settings { val ISABELLE_BUILD_OPTIONS = SQL.Column.string("ISABELLE_BUILD_OPTIONS") val ML_PLATFORM = SQL.Column.string("ML_PLATFORM") val ML_HOME = SQL.Column.string("ML_HOME") val ML_SYSTEM = SQL.Column.string("ML_SYSTEM") val ML_OPTIONS = SQL.Column.string("ML_OPTIONS") val ml_settings = List(ML_PLATFORM, ML_HOME, ML_SYSTEM, ML_OPTIONS) val all_settings = ISABELLE_BUILD_OPTIONS :: ml_settings type Entry = (String, String) type T = List[Entry] object Entry { def unapply(s: String): Option[Entry] = for { (a, b) <- Properties.Eq.unapply(s) } yield (a, Library.perhaps_unquote(b)) def getenv(a: String): String = Properties.Eq(a, quote(Isabelle_System.getenv(a))) } def show(): String = cat_lines( List(Entry.getenv("ISABELLE_TOOL_JAVA_OPTIONS"), Entry.getenv(ISABELLE_BUILD_OPTIONS.name), "") ::: ml_settings.map(c => Entry.getenv(c.name))) } /* file names */ def log_date(date: Date): String = String.format(Locale.ROOT, "%s.%05d", DateTimeFormatter.ofPattern("yyyy-MM-dd").format(date.rep), java.lang.Long.valueOf((date.time - date.midnight.time).ms / 1000)) def log_subdir(date: Date): Path = Path.explode("log") + Path.explode(date.rep.getYear.toString) def log_filename(engine: String, date: Date, more: List[String] = Nil): Path = Path.explode((engine :: log_date(date) :: more).mkString("", "_", ".log")) /** log file **/ def print_date(date: Date): String = Log_File.Date_Format(date) object Log_File { /* log file */ def plain_name(name: String): String = { List(".log", ".log.gz", ".log.xz", ".gz", ".xz").find(name.endsWith) match { case Some(s) => Library.try_unsuffix(s, name).get case None => name } } def apply(name: String, lines: List[String]): Log_File = new Log_File(plain_name(name), lines.map(Library.trim_line)) def apply(name: String, text: String): Log_File = new Log_File(plain_name(name), Library.trim_split_lines(text)) def apply(file: JFile): Log_File = { val name = file.getName val text = if (File.is_gz(name)) File.read_gzip(file) else if (File.is_xz(name)) File.read_xz(file) else File.read(file) apply(name, text) } def apply(path: Path): Log_File = apply(path.file) /* log file collections */ def is_log(file: JFile, prefixes: List[String] = List(Build_History.log_prefix, Identify.log_prefix, Identify.log_prefix2, Isatest.log_prefix, AFP_Test.log_prefix), suffixes: List[String] = List(".log", ".log.gz", ".log.xz") ): Boolean = { val name = file.getName prefixes.exists(name.startsWith) && suffixes.exists(name.endsWith) && name != "isatest.log" && name != "afp-test.log" && name != "main.log" } /* date format */ val Date_Format = { val fmts = Date.Formatter.variants( List("EEE MMM d HH:mm:ss O yyyy", "EEE MMM d HH:mm:ss VV yyyy"), List(Locale.ENGLISH, Locale.GERMAN)) ::: List( DateTimeFormatter.RFC_1123_DATE_TIME, Date.Formatter.pattern("EEE MMM d HH:mm:ss yyyy").withZone(Date.timezone_berlin)) def tune_timezone(s: String): String = s match { case "CET" | "MET" => "GMT+1" case "CEST" | "MEST" => "GMT+2" case "EST" => "Europe/Berlin" case _ => s } def tune_weekday(s: String): String = s match { case "Die" => "Di" case "Mit" => "Mi" case "Don" => "Do" case "Fre" => "Fr" case "Sam" => "Sa" case "Son" => "So" case _ => s } def tune(s: String): String = Word.implode( Word.explode(s) match { case a :: "M\uFFFDr" :: bs => tune_weekday(a) :: "Mär" :: bs.map(tune_timezone) case a :: bs => tune_weekday(a) :: bs.map(tune_timezone) case Nil => Nil } ) Date.Format.make(fmts, tune) } } class Log_File private(val name: String, val lines: List[String]) { log_file => override def toString: String = name def text: String = cat_lines(lines) def err(msg: String): Nothing = error("Error in log file " + quote(name) + ": " + msg) /* date format */ object Strict_Date { def unapply(s: String): Some[Date] = try { Some(Log_File.Date_Format.parse(s)) } catch { case exn: DateTimeParseException => log_file.err(exn.getMessage) } } /* inlined text */ def filter(Marker: Protocol_Message.Marker): List[String] = for (Marker(text) <- lines) yield text def find(Marker: Protocol_Message.Marker): Option[String] = lines.collectFirst({ case Marker(text) => text }) def find_match(regexes: List[Regex]): Option[String] = regexes match { case Nil => None case regex :: rest => lines.iterator.map(regex.unapplySeq(_)).find(res => res.isDefined && res.get.length == 1). map(res => res.get.head) orElse find_match(rest) } /* settings */ def get_setting(name: String): Option[Settings.Entry] = lines.collectFirst({ case Settings.Entry(a, b) if a == name => a -> b }) def get_all_settings: Settings.T = for { c <- Settings.all_settings; entry <- get_setting(c.name) } yield entry /* properties (YXML) */ val cache: XML.Cache = XML.Cache.make() def parse_props(text: String): Properties.T = try { cache.props(XML.Decode.properties(YXML.parse_body(text))) } catch { case _: XML.Error => log_file.err("malformed properties") } def filter_props(marker: Protocol_Message.Marker): List[Properties.T] = for (text <- filter(marker) if YXML.detect(text)) yield parse_props(text) def find_props(marker: Protocol_Message.Marker): Option[Properties.T] = for (text <- find(marker) if YXML.detect(text)) yield parse_props(text) /* parse various formats */ def parse_meta_info(): Meta_Info = Build_Log.parse_meta_info(log_file) def parse_build_info(ml_statistics: Boolean = false): Build_Info = Build_Log.parse_build_info(log_file, ml_statistics) def parse_session_info( command_timings: Boolean = false, theory_timings: Boolean = false, ml_statistics: Boolean = false, task_statistics: Boolean = false): Session_Info = Build_Log.parse_session_info( log_file, command_timings, theory_timings, ml_statistics, task_statistics) } /** digested meta info: produced by Admin/build_other in log.xz file **/ object Meta_Info { val empty: Meta_Info = Meta_Info(Nil, Nil) } sealed case class Meta_Info(props: Properties.T, settings: Settings.T) { def is_empty: Boolean = props.isEmpty && settings.isEmpty def get(c: SQL.Column): Option[String] = Properties.get(props, c.name) orElse Properties.get(settings, c.name) def get_date(c: SQL.Column): Option[Date] = get(c).map(Log_File.Date_Format.parse) } object Identify { val log_prefix = "isabelle_identify_" val log_prefix2 = "plain_identify_" def engine(log_file: Log_File): String = if (log_file.name.startsWith(log_prefix2)) "plain_identify" else "identify" def content(date: Date, isabelle_version: Option[String], afp_version: Option[String]): String = terminate_lines( List("isabelle_identify: " + Build_Log.print_date(date), "") ::: isabelle_version.map("Isabelle version: " + _).toList ::: afp_version.map("AFP version: " + _).toList) val Start = new Regex("""^isabelle_identify: (.+)$""") val No_End = new Regex("""$.""") val Isabelle_Version = List(new Regex("""^Isabelle version: (\S+)$""")) val AFP_Version = List(new Regex("""^AFP version: (\S+)$""")) } object Isatest { val log_prefix = "isatest-makeall-" val engine = "isatest" val Start = new Regex("""^------------------- starting test --- (.+) --- (.+)$""") val End = new Regex("""^------------------- test (?:successful|FAILED) --- (.+) --- .*$""") val Isabelle_Version = List(new Regex("""^Isabelle version: (\S+)$""")) } object AFP_Test { val log_prefix = "afp-test-devel-" val engine = "afp-test" val Start = new Regex("""^Start test(?: for .+)? at ([^,]+), (.*)$""") val Start_Old = new Regex("""^Start test(?: for .+)? at ([^,]+)$""") val End = new Regex("""^End test on (.+), .+, elapsed time:.*$""") val Isabelle_Version = List(new Regex("""^Isabelle version: .* -- hg id (\S+)$""")) val AFP_Version = List(new Regex("""^AFP version: .* -- hg id (\S+)$""")) val Bad_Init = new Regex("""^cp:.*: Disc quota exceeded$""") } object Jenkins { val log_prefix = "jenkins_" val engine = "jenkins" val Host = new Regex("""^Building remotely on (\S+) \((\S+)\).*$""") val Start = new Regex("""^(?:Started by an SCM change|Started from command line by admin|).*$""") val Start_Date = new Regex("""^Build started at (.+)$""") val No_End = new Regex("""$.""") val Isabelle_Version = List(new Regex("""^(?:Build for Isabelle id|Isabelle id) (\w+).*$"""), new Regex("""^ISABELLE_CI_REPO_ID="(\w+)".*$"""), new Regex("""^(\w{12}) tip.*$""")) val AFP_Version = List(new Regex("""^(?:Build for AFP id|AFP id) (\w+).*$"""), new Regex("""^ISABELLE_CI_AFP_ID="(\w+)".*$""")) val CONFIGURATION = "=== CONFIGURATION ===" val BUILD = "=== BUILD ===" } private def parse_meta_info(log_file: Log_File): Meta_Info = { def parse(engine: String, host: String, start: Date, End: Regex, Isabelle_Version: List[Regex], AFP_Version: List[Regex] ): Meta_Info = { val build_id = { val prefix = proper_string(host) orElse proper_string(engine) getOrElse "build" prefix + ":" + start.time.ms } val build_engine = if (engine == "") Nil else List(Prop.build_engine.name -> engine) val build_host = if (host == "") Nil else List(Prop.build_host.name -> host) val start_date = List(Prop.build_start.name -> print_date(start)) val end_date = log_file.lines.last match { case End(log_file.Strict_Date(end_date)) => List(Prop.build_end.name -> print_date(end_date)) case _ => Nil } val isabelle_version = log_file.find_match(Isabelle_Version).map(Prop.isabelle_version.name -> _) val afp_version = log_file.find_match(AFP_Version).map(Prop.afp_version.name -> _) Meta_Info((Prop.build_id.name -> build_id) :: build_engine ::: build_host ::: start_date ::: end_date ::: isabelle_version.toList ::: afp_version.toList, log_file.get_all_settings) } log_file.lines match { case line :: _ if Protocol.Meta_Info_Marker.test_yxml(line) => Meta_Info(log_file.find_props(Protocol.Meta_Info_Marker).get, log_file.get_all_settings) case Identify.Start(log_file.Strict_Date(start)) :: _ => parse(Identify.engine(log_file), "", start, Identify.No_End, Identify.Isabelle_Version, Identify.AFP_Version) case Isatest.Start(log_file.Strict_Date(start), host) :: _ => parse(Isatest.engine, host, start, Isatest.End, Isatest.Isabelle_Version, Nil) case AFP_Test.Start(log_file.Strict_Date(start), host) :: _ => parse(AFP_Test.engine, host, start, AFP_Test.End, AFP_Test.Isabelle_Version, AFP_Test.AFP_Version) case AFP_Test.Start_Old(log_file.Strict_Date(start)) :: _ => parse(AFP_Test.engine, "", start, AFP_Test.End, AFP_Test.Isabelle_Version, AFP_Test.AFP_Version) case line :: _ if line.startsWith("\u0000") => Meta_Info.empty case List(Isatest.End(_)) => Meta_Info.empty case _ :: AFP_Test.Bad_Init() :: _ => Meta_Info.empty case Nil => Meta_Info.empty case _ => log_file.err("cannot detect log file format") } } /** build info: toplevel output of isabelle build or Admin/build_other **/ val SESSION_NAME = "session_name" object Session_Status extends Enumeration { val existing, finished, failed, cancelled = Value } sealed case class Session_Entry( chapter: String = "", groups: List[String] = Nil, threads: Option[Int] = None, timing: Timing = Timing.zero, ml_timing: Timing = Timing.zero, sources: Option[String] = None, heap_size: Option[Space] = None, status: Option[Session_Status.Value] = None, errors: List[String] = Nil, theory_timings: Map[String, Timing] = Map.empty, ml_statistics: List[Properties.T] = Nil ) { def proper_groups: Option[String] = if (groups.isEmpty) None else Some(cat_lines(groups)) def finished: Boolean = status == Some(Session_Status.finished) def failed: Boolean = status == Some(Session_Status.failed) } object Build_Info { val sessions_dummy: Map[String, Session_Entry] = Map("" -> Session_Entry(theory_timings = Map("" -> Timing.zero))) } sealed case class Build_Info(sessions: Map[String, Session_Entry]) { def finished_sessions: List[String] = for ((a, b) <- sessions.toList if b.finished) yield a def failed_sessions: List[String] = for ((a, b) <- sessions.toList if b.failed) yield a } private def parse_build_info(log_file: Log_File, parse_ml_statistics: Boolean): Build_Info = { object Chapter_Name { def unapply(s: String): Some[(String, String)] = space_explode('/', s) match { case List(chapter, name) => Some((chapter, name)) case _ => Some(("", s)) } } val Session_No_Groups = new Regex("""^Session (\S+)$""") val Session_Groups = new Regex("""^Session (\S+) \((.*)\)$""") val Session_Finished1 = new Regex("""^Finished (\S+) \((\d+):(\d+):(\d+) elapsed time, (\d+):(\d+):(\d+) cpu time.*$""") val Session_Finished2 = new Regex("""^Finished ([^\s/]+) \((\d+):(\d+):(\d+) elapsed time.*$""") val Session_Timing = new Regex("""^Timing (\S+) \((\d+) threads, (\d+\.\d+)s elapsed time, (\d+\.\d+)s cpu time, (\d+\.\d+)s GC time.*$""") val Session_Started1 = new Regex("""^(?:Running|Building) (\S+) \.\.\.$""") val Session_Started2 = new Regex("""^(?:Running|Building) (\S+) on \S+ \.\.\.$""") val Sources = new Regex("""^Sources (\S+) (\S{""" + SHA1.digest_length + """})$""") val Heap = new Regex("""^Heap (\S+) \((\d+) bytes\)$""") object Theory_Timing { def unapply(line: String): Option[(String, (String, Timing))] = Protocol.Theory_Timing_Marker.unapply(line.replace('~', '-')).map(log_file.parse_props) match { case Some((SESSION_NAME, session) :: props) => for (theory <- Markup.Name.unapply(props)) yield (session, theory -> Markup.Timing_Properties.get(props)) case _ => None } } var chapter = Map.empty[String, String] var groups = Map.empty[String, List[String]] var threads = Map.empty[String, Int] var timing = Map.empty[String, Timing] var ml_timing = Map.empty[String, Timing] var started = Set.empty[String] var sources = Map.empty[String, String] var heap_sizes = Map.empty[String, Space] var theory_timings = Map.empty[String, Map[String, Timing]] var ml_statistics = Map.empty[String, List[Properties.T]] var errors = Map.empty[String, List[String]] def all_sessions: Set[String] = chapter.keySet ++ groups.keySet ++ threads.keySet ++ timing.keySet ++ ml_timing.keySet ++ started ++ sources.keySet ++ heap_sizes.keySet ++ theory_timings.keySet ++ ml_statistics.keySet for (line <- log_file.lines) { line match { case Session_No_Groups(Chapter_Name(chapt, name)) => chapter += (name -> chapt) groups += (name -> Nil) case Session_Groups(Chapter_Name(chapt, name), grps) => chapter += (name -> chapt) groups += (name -> Word.explode(grps)) case Session_Started1(name) => started += name case Session_Started2(name) => started += name case Session_Finished1(name, Value.Int(e1), Value.Int(e2), Value.Int(e3), Value.Int(c1), Value.Int(c2), Value.Int(c3)) => val elapsed = Time.hms(e1, e2, e3) val cpu = Time.hms(c1, c2, c3) timing += (name -> Timing(elapsed, cpu, Time.zero)) case Session_Finished2(name, Value.Int(e1), Value.Int(e2), Value.Int(e3)) => val elapsed = Time.hms(e1, e2, e3) timing += (name -> Timing(elapsed, Time.zero, Time.zero)) case Session_Timing(name, Value.Int(t), Value.Double(e), Value.Double(c), Value.Double(g)) => val elapsed = Time.seconds(e) val cpu = Time.seconds(c) val gc = Time.seconds(g) ml_timing += (name -> Timing(elapsed, cpu, gc)) threads += (name -> t) case Sources(name, s) => sources += (name -> s) case Heap(name, Value.Long(size)) => heap_sizes += (name -> Space.bytes(size)) case _ if Protocol.Theory_Timing_Marker.test_yxml(line) => line match { case Theory_Timing(name, theory_timing) => theory_timings += (name -> (theory_timings.getOrElse(name, Map.empty) + theory_timing)) case _ => log_file.err("malformed theory_timing " + quote(line)) } case _ if parse_ml_statistics && Protocol.ML_Statistics_Marker.test_yxml(line) => Protocol.ML_Statistics_Marker.unapply(line).map(log_file.parse_props) match { case Some((SESSION_NAME, name) :: props) => ml_statistics += (name -> (props :: ml_statistics.getOrElse(name, Nil))) case _ => log_file.err("malformed ML_statistics " + quote(line)) } case _ if Protocol.Error_Message_Marker.test_yxml(line) => Protocol.Error_Message_Marker.unapply(line).map(log_file.parse_props) match { case Some(List((SESSION_NAME, name), (Markup.CONTENT, msg))) => errors += (name -> (msg :: errors.getOrElse(name, Nil))) case _ => log_file.err("malformed error message " + quote(line)) } case _ => } } val sessions = Map( (for (name <- all_sessions.toList) yield { val status = if (timing.isDefinedAt(name) || ml_timing.isDefinedAt(name)) Session_Status.finished else if (started(name)) Session_Status.failed else Session_Status.existing val entry = Session_Entry( chapter = chapter.getOrElse(name, ""), groups = groups.getOrElse(name, Nil), threads = threads.get(name), timing = timing.getOrElse(name, Timing.zero), ml_timing = ml_timing.getOrElse(name, Timing.zero), sources = sources.get(name), heap_size = heap_sizes.get(name), status = Some(status), errors = errors.getOrElse(name, Nil).reverse, theory_timings = theory_timings.getOrElse(name, Map.empty), ml_statistics = ml_statistics.getOrElse(name, Nil).reverse) (name -> entry) }):_*) Build_Info(sessions) } /** session info: produced by isabelle build as session database **/ sealed case class Session_Info( session_timing: Properties.T, command_timings: List[Properties.T], theory_timings: List[Properties.T], ml_statistics: List[Properties.T], task_statistics: List[Properties.T], errors: List[String] ) { def error(s: String): Session_Info = copy(errors = errors ::: List(s)) } private def parse_session_info( log_file: Log_File, command_timings: Boolean, theory_timings: Boolean, ml_statistics: Boolean, task_statistics: Boolean ): Session_Info = { Session_Info( session_timing = log_file.find_props(Protocol.Session_Timing_Marker) getOrElse Nil, command_timings = if (command_timings) log_file.filter_props(Protocol.Command_Timing_Marker) else Nil, theory_timings = if (theory_timings) log_file.filter_props(Protocol.Theory_Timing_Marker) else Nil, ml_statistics = if (ml_statistics) log_file.filter_props(Protocol.ML_Statistics_Marker) else Nil, task_statistics = if (task_statistics) log_file.filter_props(Protocol.Task_Statistics_Marker) else Nil, errors = log_file.filter(Protocol.Error_Message_Marker)) } def compress_errors( errors: List[String], cache: Compress.Cache = Compress.Cache.none ): Option[Bytes] = if (errors.isEmpty) None else { Some(Bytes(YXML.string_of_body(XML.Encode.list(XML.Encode.string)(errors))). compress(cache = cache)) } def uncompress_errors(bytes: Bytes, cache: XML.Cache = XML.Cache.make()): List[String] = if (bytes.is_empty) Nil else { XML.Decode.list(YXML.string_of_body)( YXML.parse_body(bytes.uncompress(cache = cache.compress).text, cache = cache)) } /** persistent store **/ /* SQL data model */ object Data { def build_log_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table = SQL.Table("isabelle_build_log_" + name, columns, body) /* main content */ val log_name = SQL.Column.string("log_name").make_primary_key val session_name = SQL.Column.string("session_name").make_primary_key val theory_name = SQL.Column.string("theory_name").make_primary_key val chapter = SQL.Column.string("chapter") val groups = SQL.Column.string("groups") val threads = SQL.Column.int("threads") val timing_elapsed = SQL.Column.long("timing_elapsed") val timing_cpu = SQL.Column.long("timing_cpu") val timing_gc = SQL.Column.long("timing_gc") val timing_factor = SQL.Column.double("timing_factor") val ml_timing_elapsed = SQL.Column.long("ml_timing_elapsed") val ml_timing_cpu = SQL.Column.long("ml_timing_cpu") val ml_timing_gc = SQL.Column.long("ml_timing_gc") val ml_timing_factor = SQL.Column.double("ml_timing_factor") val theory_timing_elapsed = SQL.Column.long("theory_timing_elapsed") val theory_timing_cpu = SQL.Column.long("theory_timing_cpu") val theory_timing_gc = SQL.Column.long("theory_timing_gc") val heap_size = SQL.Column.long("heap_size") val status = SQL.Column.string("status") val errors = SQL.Column.bytes("errors") val sources = SQL.Column.string("sources") val ml_statistics = SQL.Column.bytes("ml_statistics") val known = SQL.Column.bool("known") val meta_info_table = build_log_table("meta_info", log_name :: Prop.all_props ::: Settings.all_settings) val sessions_table = build_log_table("sessions", List(log_name, session_name, chapter, groups, threads, timing_elapsed, timing_cpu, timing_gc, timing_factor, ml_timing_elapsed, ml_timing_cpu, ml_timing_gc, ml_timing_factor, heap_size, status, errors, sources)) val theories_table = build_log_table("theories", List(log_name, session_name, theory_name, theory_timing_elapsed, theory_timing_cpu, theory_timing_gc)) val ml_statistics_table = build_log_table("ml_statistics", List(log_name, session_name, ml_statistics)) /* AFP versions */ val isabelle_afp_versions_table: SQL.Table = { val version1 = Prop.isabelle_version val version2 = Prop.afp_version build_log_table("isabelle_afp_versions", List(version1.make_primary_key, version2), SQL.select(List(version1, version2), distinct = true) + meta_info_table + SQL.where(SQL.and(version1.defined, version2.defined))) } /* earliest pull date for repository version (PostgreSQL queries) */ def pull_date(afp: Boolean = false): SQL.Column = if (afp) SQL.Column.date("afp_pull_date") else SQL.Column.date("pull_date") def pull_date_table(afp: Boolean = false): SQL.Table = { val (name, versions) = if (afp) ("afp_pull_date", List(Prop.isabelle_version, Prop.afp_version)) else ("pull_date", List(Prop.isabelle_version)) build_log_table(name, versions.map(_.make_primary_key) ::: List(pull_date(afp)), "SELECT " + versions.mkString(", ") + ", min(" + Prop.build_start + ") AS " + pull_date(afp) + " FROM " + meta_info_table + " WHERE " + SQL.AND((versions ::: List(Prop.build_start)).map(_.defined)) + " GROUP BY " + versions.mkString(", ")) } /* recent entries */ def recent_time(days: Int): PostgreSQL.Source = "now() - INTERVAL '" + days.max(0) + " days'" def recent_pull_date_table( days: Int, rev: String = "", afp_rev: Option[String] = None ): SQL.Table = { val afp = afp_rev.isDefined val rev2 = afp_rev.getOrElse("") val table = pull_date_table(afp) val eq_rev = if_proper(rev, Prop.isabelle_version(table).equal(rev)) val eq_rev2 = if_proper(rev2, Prop.afp_version(table).equal(rev2)) SQL.Table("recent_pull_date", table.columns, table.select(table.columns, sql = SQL.where( SQL.or(pull_date(afp)(table).ident + " > " + recent_time(days), SQL.and(eq_rev, eq_rev2))))) } def select_recent_log_names(days: Int): PostgreSQL.Source = { val table1 = meta_info_table val table2 = recent_pull_date_table(days) table1.select(List(log_name), distinct = true, sql = SQL.join_inner + table2.query_named + " ON " + Prop.isabelle_version(table1) + " = " + Prop.isabelle_version(table2)) } def select_recent_versions( days: Int, rev: String = "", afp_rev: Option[String] = None, sql: PostgreSQL.Source = "" ): PostgreSQL.Source = { val afp = afp_rev.isDefined val version = Prop.isabelle_version val table1 = recent_pull_date_table(days, rev = rev, afp_rev = afp_rev) val table2 = meta_info_table val aux_table = SQL.Table("aux", table2.columns, table2.select(sql = sql)) val columns = table1.columns.map(c => c(table1)) ::: List(known.copy(expr = log_name(aux_table).defined)) SQL.select(columns, distinct = true) + table1.query_named + SQL.join_outer + aux_table.query_named + " ON " + version(table1) + " = " + version(aux_table) + SQL.order_by(List(pull_date(afp)(table1)), descending = true) } /* universal view on main data */ val universal_table: SQL.Table = { val afp_pull_date = pull_date(afp = true) val version1 = Prop.isabelle_version val version2 = Prop.afp_version val table1 = meta_info_table val table2 = pull_date_table(afp = true) val table3 = pull_date_table() val a_columns = log_name :: afp_pull_date :: table1.columns.tail val a_table = SQL.Table("a", a_columns, SQL.select(List(log_name, afp_pull_date) ::: table1.columns.tail.map(_.apply(table1))) + table1 + SQL.join_outer + table2 + " ON " + SQL.and( version1(table1).ident + " = " + version1(table2).ident, version2(table1).ident + " = " + version2(table2).ident)) val b_columns = log_name :: pull_date() :: a_columns.tail val b_table = SQL.Table("b", b_columns, SQL.select( List(log_name(a_table), pull_date()(table3)) ::: a_columns.tail.map(_.apply(a_table))) + a_table.query_named + SQL.join_outer + table3 + " ON " + version1(a_table) + " = " + version1(table3)) val c_columns = b_columns ::: sessions_table.columns.tail val c_table = SQL.Table("c", c_columns, SQL.select(log_name(b_table) :: c_columns.tail) + b_table.query_named + SQL.join_inner + sessions_table + " ON " + log_name(b_table) + " = " + log_name(sessions_table)) SQL.Table("isabelle_build_log", c_columns ::: List(ml_statistics), { SQL.select(c_columns.map(_.apply(c_table)) ::: List(ml_statistics)) + c_table.query_named + SQL.join_outer + ml_statistics_table + " ON " + SQL.and( log_name(c_table).ident + " = " + log_name(ml_statistics_table).ident, session_name(c_table).ident + " = " + session_name(ml_statistics_table).ident) }) } } /* database access */ def store(options: Options, cache: XML.Cache = XML.Cache.make()): Store = new Store(options, cache) class Store private[Build_Log](options: Options, val cache: XML.Cache) { def open_database( user: String = options.string("build_log_database_user"), password: String = options.string("build_log_database_password"), database: String = options.string("build_log_database_name"), host: String = options.string("build_log_database_host"), port: Int = options.int("build_log_database_port"), ssh_host: String = options.string("build_log_ssh_host"), ssh_user: String = options.string("build_log_ssh_user"), ssh_port: Int = options.int("build_log_ssh_port") ): PostgreSQL.Database = { PostgreSQL.open_database( user = user, password = password, database = database, host = host, port = port, ssh = if (ssh_host == "") None else Some(SSH.open_session(options, host = ssh_host, user = ssh_user, port = ssh_port)), ssh_close = true) } def update_database( db: PostgreSQL.Database, dirs: List[Path], ml_statistics: Boolean = false): Unit = { val log_files = dirs.flatMap(dir => File.find_files(dir.file, pred = Log_File.is_log(_), follow_links = true)) write_info(db, log_files, ml_statistics = ml_statistics) db.create_view(Data.pull_date_table()) db.create_view(Data.pull_date_table(afp = true)) db.create_view(Data.universal_table) } def snapshot_database( db: PostgreSQL.Database, sqlite_database: Path, days: Int = 100, ml_statistics: Boolean = false ): Unit = { Isabelle_System.make_directory(sqlite_database.dir) sqlite_database.file.delete using(SQLite.open_database(sqlite_database)) { db2 => db.transaction { db2.transaction { // main content db2.create_table(Data.meta_info_table) db2.create_table(Data.sessions_table) db2.create_table(Data.theories_table) db2.create_table(Data.ml_statistics_table) val recent_log_names = db.execute_query_statement( Data.select_recent_log_names(days), List.from[String], res => res.string(Data.log_name)) for (log_name <- recent_log_names) { read_meta_info(db, log_name).foreach(meta_info => update_meta_info(db2, log_name, meta_info)) update_sessions(db2, log_name, read_build_info(db, log_name)) if (ml_statistics) { update_ml_statistics(db2, log_name, read_build_info(db, log_name, ml_statistics = true)) } } // pull_date for (afp <- List(false, true)) { val afp_rev = if (afp) Some("") else None val table = Data.pull_date_table(afp) db2.create_table(table) db2.using_statement(table.insert()) { stmt2 => db.using_statement( Data.recent_pull_date_table(days, afp_rev = afp_rev).query) { stmt => val res = stmt.execute_query() while (res.next()) { for ((c, i) <- table.columns.zipWithIndex) { stmt2.string(i + 1) = res.get_string(c) } stmt2.execute() } } } } // full view db2.create_view(Data.universal_table) } } - db2.rebuild() + db2.vacuum() } } def domain(db: SQL.Database, table: SQL.Table, column: SQL.Column): Set[String] = db.execute_query_statement( table.select(List(column), distinct = true), Set.from[String], res => res.string(column)) def update_meta_info(db: SQL.Database, log_name: String, meta_info: Meta_Info): Unit = db.execute_statement(db.insert_permissive(Data.meta_info_table), body = { stmt => stmt.string(1) = log_name for ((c, i) <- Data.meta_info_table.columns.tail.zipWithIndex) { if (c.T == SQL.Type.Date) stmt.date(i + 2) = meta_info.get_date(c) else stmt.string(i + 2) = meta_info.get(c) } }) def update_sessions(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = db.execute_statement(db.insert_permissive(Data.sessions_table), body = { stmt => val sessions = if (build_info.sessions.isEmpty) Build_Info.sessions_dummy else build_info.sessions for ((session_name, session) <- sessions) { stmt.string(1) = log_name stmt.string(2) = session_name stmt.string(3) = proper_string(session.chapter) stmt.string(4) = session.proper_groups stmt.int(5) = session.threads stmt.long(6) = session.timing.elapsed.proper_ms stmt.long(7) = session.timing.cpu.proper_ms stmt.long(8) = session.timing.gc.proper_ms stmt.double(9) = session.timing.factor stmt.long(10) = session.ml_timing.elapsed.proper_ms stmt.long(11) = session.ml_timing.cpu.proper_ms stmt.long(12) = session.ml_timing.gc.proper_ms stmt.double(13) = session.ml_timing.factor stmt.long(14) = session.heap_size.map(_.bytes) stmt.string(15) = session.status.map(_.toString) stmt.bytes(16) = compress_errors(session.errors, cache = cache.compress) stmt.string(17) = session.sources } }) def update_theories(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = db.execute_statement(db.insert_permissive(Data.theories_table), body = { stmt => val sessions = if (build_info.sessions.forall({ case (_, session) => session.theory_timings.isEmpty })) Build_Info.sessions_dummy else build_info.sessions for { (session_name, session) <- sessions (theory_name, timing) <- session.theory_timings } { stmt.string(1) = log_name stmt.string(2) = session_name stmt.string(3) = theory_name stmt.long(4) = timing.elapsed.ms stmt.long(5) = timing.cpu.ms stmt.long(6) = timing.gc.ms } }) def update_ml_statistics(db: SQL.Database, log_name: String, build_info: Build_Info): Unit = db.execute_statement(db.insert_permissive(Data.ml_statistics_table), body = { stmt => val ml_stats: List[(String, Option[Bytes])] = Par_List.map[(String, Session_Entry), (String, Option[Bytes])]( { case (a, b) => (a, Properties.compress(b.ml_statistics, cache = cache.compress).proper) }, build_info.sessions.iterator.filter(p => p._2.ml_statistics.nonEmpty).toList) val entries = if (ml_stats.nonEmpty) ml_stats else List("" -> None) for ((session_name, ml_statistics) <- entries) { stmt.string(1) = log_name stmt.string(2) = session_name stmt.bytes(3) = ml_statistics } }) def write_info(db: SQL.Database, files: List[JFile], ml_statistics: Boolean = false): Unit = { abstract class Table_Status(table: SQL.Table) { db.create_table(table) private var known: Set[String] = domain(db, table, Data.log_name) def required(file: JFile): Boolean = !known(Log_File.plain_name(file.getName)) def update_db(db: SQL.Database, log_file: Log_File): Unit def update(log_file: Log_File): Unit = { if (!known(log_file.name)) { update_db(db, log_file) known += log_file.name } } } val status = List( new Table_Status(Data.meta_info_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = update_meta_info(db, log_file.name, log_file.parse_meta_info()) }, new Table_Status(Data.sessions_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = update_sessions(db, log_file.name, log_file.parse_build_info()) }, new Table_Status(Data.theories_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = update_theories(db, log_file.name, log_file.parse_build_info()) }, new Table_Status(Data.ml_statistics_table) { override def update_db(db: SQL.Database, log_file: Log_File): Unit = if (ml_statistics) { update_ml_statistics(db, log_file.name, log_file.parse_build_info(ml_statistics = true)) } }) for (file_group <- files.filter(file => status.exists(_.required(file))). grouped(options.int("build_log_transaction_size") max 1)) { val log_files = Par_List.map[JFile, Log_File](Log_File.apply, file_group) db.transaction { log_files.foreach(log_file => status.foreach(_.update(log_file))) } } } def read_meta_info(db: SQL.Database, log_name: String): Option[Meta_Info] = { val table = Data.meta_info_table val columns = table.columns.tail db.execute_query_statementO[Meta_Info]( table.select(columns, sql = Data.log_name.where_equal(log_name)), { res => val results = columns.map(c => c.name -> (if (c.T == SQL.Type.Date) res.get_date(c).map(Log_File.Date_Format(_)) else res.get_string(c))) val n = Prop.all_props.length val props = for ((x, Some(y)) <- results.take(n)) yield (x, y) val settings = for ((x, Some(y)) <- results.drop(n)) yield (x, y) Meta_Info(props, settings) } ) } def read_build_info( db: SQL.Database, log_name: String, session_names: List[String] = Nil, ml_statistics: Boolean = false ): Build_Info = { val table1 = Data.sessions_table val table2 = Data.ml_statistics_table val columns1 = table1.columns.tail.map(_.apply(table1)) val (columns, from) = if (ml_statistics) { val columns = columns1 ::: List(Data.ml_statistics(table2)) val join = table1.ident + SQL.join_outer + table2.ident + " ON " + SQL.and( Data.log_name(table1).ident + " = " + Data.log_name(table2).ident, Data.session_name(table1).ident + " = " + Data.session_name(table2).ident) (columns, SQL.enclose(join)) } else (columns1, table1.ident) val where = SQL.where( SQL.and( Data.log_name(table1).equal(log_name), Data.session_name(table1).ident + " <> ''", if_proper(session_names, Data.session_name(table1).member(session_names)))) val sessions = db.execute_query_statement( SQL.select(columns, sql = from + where), Map.from[String, Session_Entry], { res => val session_name = res.string(Data.session_name) val session_entry = Session_Entry( chapter = res.string(Data.chapter), groups = split_lines(res.string(Data.groups)), threads = res.get_int(Data.threads), timing = res.timing( Data.timing_elapsed, Data.timing_cpu, Data.timing_gc), ml_timing = res.timing( Data.ml_timing_elapsed, Data.ml_timing_cpu, Data.ml_timing_gc), sources = res.get_string(Data.sources), heap_size = res.get_long(Data.heap_size).map(Space.bytes), status = res.get_string(Data.status).map(Session_Status.withName), errors = uncompress_errors(res.bytes(Data.errors), cache = cache), ml_statistics = if (ml_statistics) { Properties.uncompress(res.bytes(Data.ml_statistics), cache = cache) } else Nil) session_name -> session_entry } ) Build_Info(sessions) } } } diff --git a/src/Pure/General/sql.scala b/src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala +++ b/src/Pure/General/sql.scala @@ -1,619 +1,625 @@ /* Title: Pure/General/sql.scala Author: Makarius Support for SQL databases: SQLite and PostgreSQL. */ package isabelle import java.time.OffsetDateTime import java.sql.{DriverManager, Connection, PreparedStatement, ResultSet} import org.sqlite.jdbc4.JDBC4Connection import org.postgresql.{PGConnection, PGNotification} import scala.collection.mutable object SQL { /** SQL language **/ type Source = String /* concrete syntax */ def escape_char(c: Char): String = c match { case '\u0000' => "\\0" case '\'' => "\\'" case '\"' => "\\\"" case '\b' => "\\b" case '\n' => "\\n" case '\r' => "\\r" case '\t' => "\\t" case '\u001a' => "\\Z" case '\\' => "\\\\" case _ => c.toString } def string(s: String): Source = s.iterator.map(escape_char).mkString("'", "", "'") def ident(s: String): Source = Long_Name.implode(Long_Name.explode(s).map(a => quote(a.replace("\"", "\"\"")))) def enclose(s: Source): Source = "(" + s + ")" def enclosure(ss: Iterable[Source]): Source = ss.mkString("(", ", ", ")") def separate(sql: Source): Source = (if (sql.isEmpty || sql.startsWith(" ")) "" else " ") + sql def select(columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = ""): Source = "SELECT " + (if (distinct) "DISTINCT " else "") + (if (columns.isEmpty) "*" else commas(columns.map(_.ident))) + " FROM " + sql val join_outer: Source = " LEFT OUTER JOIN " val join_inner: Source = " INNER JOIN " def join(outer: Boolean): Source = if (outer) join_outer else join_inner def infix(op: Source, args: Iterable[Source]): Source = { val body = args.iterator.filter(_.nonEmpty).mkString(" " + op + " ") if_proper(body, enclose(body)) } def AND(args: Iterable[Source]): Source = infix("AND", args) def OR(args: Iterable[Source]): Source = infix("OR", args) def and(args: Source*): Source = AND(args) def or(args: Source*): Source = OR(args) val TRUE: Source = "TRUE" val FALSE: Source = "FALSE" def equal(sql: Source, s: String): Source = sql + " = " + string(s) def member(sql: Source, set: Iterable[String]): Source = if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList) def where(sql: Source): Source = if_proper(sql, " WHERE " + sql) /* types */ object Type extends Enumeration { val Boolean = Value("BOOLEAN") val Int = Value("INTEGER") val Long = Value("BIGINT") val Double = Value("DOUBLE PRECISION") val String = Value("TEXT") val Bytes = Value("BLOB") val Date = Value("TIMESTAMP WITH TIME ZONE") } def sql_type_default(T: Type.Value): Source = T.toString def sql_type_sqlite(T: Type.Value): Source = if (T == Type.Boolean) "INTEGER" else if (T == Type.Date) "TEXT" else sql_type_default(T) def sql_type_postgresql(T: Type.Value): Source = if (T == Type.Bytes) "BYTEA" else sql_type_default(T) /* columns */ object Column { def bool(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Boolean, strict, primary_key) def int(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Int, strict, primary_key) def long(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Long, strict, primary_key) def double(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Double, strict, primary_key) def string(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.String, strict, primary_key) def bytes(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Bytes, strict, primary_key) def date(name: String, strict: Boolean = false, primary_key: Boolean = false): Column = Column(name, Type.Date, strict, primary_key) } sealed case class Column( name: String, T: Type.Value, strict: Boolean = false, primary_key: Boolean = false, expr: SQL.Source = "" ) { def make_primary_key: Column = copy(primary_key = true) def apply(table: Table): Column = Column(Long_Name.qualify(table.name, name), T, strict = strict, primary_key = primary_key) def ident: Source = if (expr == "") SQL.ident(name) else enclose(expr) + " AS " + SQL.ident(name) def decl(sql_type: Type.Value => Source): Source = ident + " " + sql_type(T) + (if (strict || primary_key) " NOT NULL" else "") def defined: String = ident + " IS NOT NULL" def undefined: String = ident + " IS NULL" def equal(s: String): Source = SQL.equal(ident, s) def member(set: Iterable[String]): Source = SQL.member(ident, set) def where_equal(s: String): Source = SQL.where(equal(s)) def where_member(set: Iterable[String]): Source = SQL.where(member(set)) override def toString: Source = ident } def order_by(columns: List[Column], descending: Boolean = false): Source = " ORDER BY " + columns.mkString(", ") + (if (descending) " DESC" else "") /* tables */ sealed case class Table(name: String, columns: List[Column], body: Source = "") { Library.duplicates(columns.map(_.name)) match { case Nil => case bad => error("Duplicate column names " + commas_quote(bad) + " for table " + quote(name)) } def ident: Source = SQL.ident(name) def query: Source = if (body == "") error("Missing SQL body for table " + quote(name)) else SQL.enclose(body) def query_named: Source = query + " AS " + SQL.ident(name) def create(strict: Boolean, sql_type: Type.Value => Source): Source = { val primary_key = columns.filter(_.primary_key).map(_.name) match { case Nil => Nil case keys => List("PRIMARY KEY " + enclosure(keys)) } "CREATE TABLE " + (if (strict) "" else "IF NOT EXISTS ") + ident + " " + enclosure(columns.map(_.decl(sql_type)) ::: primary_key) } def create_index(index_name: String, index_columns: List[Column], strict: Boolean = false, unique: Boolean = false): Source = "CREATE " + (if (unique) "UNIQUE " else "") + "INDEX " + (if (strict) "" else "IF NOT EXISTS ") + SQL.ident(index_name) + " ON " + ident + " " + enclosure(index_columns.map(_.name)) def insert_cmd(cmd: Source = "INSERT", sql: Source = ""): Source = cmd + " INTO " + ident + " VALUES " + enclosure(columns.map(_ => "?")) + SQL.separate(sql) def insert(sql: Source = ""): Source = insert_cmd(sql = sql) def delete(sql: Source = ""): Source = "DELETE FROM " + ident + SQL.separate(sql) def update(update_columns: List[Column] = Nil, sql: Source = ""): Source = "UPDATE " + ident + " SET " + commas(update_columns.map(c => c.ident + " = ?")) + SQL.separate(sql) def select( select_columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = "" ): Source = SQL.select(select_columns, distinct = distinct, sql = ident + SQL.separate(sql)) override def toString: Source = ident } /* table groups */ object Tables { def list(list: List[Table]): Tables = new Tables(list) + val empty: Tables = list(Nil) def apply(args: Table*): Tables = list(args.toList) } final class Tables private(val list: List[Table]) extends Iterable[Table] { override def toString: String = list.mkString("SQL.Tables(", ", ", ")") def iterator: Iterator[Table] = list.iterator // requires transaction def create_lock(db: Database): Unit = { foreach(db.create_table(_)) lock(db) } // requires transaction def lock(db: Database): Unit = { val sql = db.lock_tables(list) if (sql.nonEmpty) db.execute_statement(sql) } } /** SQL database operations **/ /* statements */ class Statement private[SQL](val db: Database, val rep: PreparedStatement) extends AutoCloseable { stmt => object bool { def update(i: Int, x: Boolean): Unit = rep.setBoolean(i, x) def update(i: Int, x: Option[Boolean]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.BOOLEAN) } } object int { def update(i: Int, x: Int): Unit = rep.setInt(i, x) def update(i: Int, x: Option[Int]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.INTEGER) } } object long { def update(i: Int, x: Long): Unit = rep.setLong(i, x) def update(i: Int, x: Option[Long]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.BIGINT) } } object double { def update(i: Int, x: Double): Unit = rep.setDouble(i, x) def update(i: Int, x: Option[Double]): Unit = { if (x.isDefined) update(i, x.get) else rep.setNull(i, java.sql.Types.DOUBLE) } } object string { def update(i: Int, x: String): Unit = rep.setString(i, x) def update(i: Int, x: Option[String]): Unit = update(i, x.orNull) } object bytes { def update(i: Int, bytes: Bytes): Unit = { if (bytes == null) rep.setBytes(i, null) else rep.setBinaryStream(i, bytes.stream(), bytes.length) } def update(i: Int, bytes: Option[Bytes]): Unit = update(i, bytes.orNull) } object date { def update(i: Int, date: Date): Unit = db.update_date(stmt, i, date) def update(i: Int, date: Option[Date]): Unit = update(i, date.orNull) } def execute(): Boolean = rep.execute() def execute_query(): Result = new Result(this, rep.executeQuery()) def close(): Unit = rep.close() } /* results */ class Result private[SQL](val stmt: Statement, val rep: ResultSet) { res => def next(): Boolean = rep.next() def iterator[A](get: Result => A): Iterator[A] = new Iterator[A] { private var _next: Boolean = res.next() def hasNext: Boolean = _next def next(): A = { val x = get(res); _next = res.next(); x } } def bool(column: Column): Boolean = rep.getBoolean(column.name) def int(column: Column): Int = rep.getInt(column.name) def long(column: Column): Long = rep.getLong(column.name) def double(column: Column): Double = rep.getDouble(column.name) def string(column: Column): String = { val s = rep.getString(column.name) if (s == null) "" else s } def bytes(column: Column): Bytes = { val bs = rep.getBytes(column.name) if (bs == null) Bytes.empty else Bytes(bs) } def date(column: Column): Date = stmt.db.date(res, column) def timing(c1: Column, c2: Column, c3: Column): Timing = Timing(Time.ms(long(c1)), Time.ms(long(c2)), Time.ms(long(c3))) def get[A](column: Column, f: Column => A): Option[A] = { val x = f(column) if (rep.wasNull || x == null) None else Some(x) } def get_bool(column: Column): Option[Boolean] = get(column, bool) def get_int(column: Column): Option[Int] = get(column, int) def get_long(column: Column): Option[Long] = get(column, long) def get_double(column: Column): Option[Double] = get(column, double) def get_string(column: Column): Option[String] = get(column, string) def get_bytes(column: Column): Option[Bytes] = get(column, bytes) def get_date(column: Column): Option[Date] = get(column, date) } /* database */ trait Database extends AutoCloseable { db => def is_sqlite: Boolean = isInstanceOf[SQLite.Database] def is_postgresql: Boolean = isInstanceOf[PostgreSQL.Database] - def rebuild(): Unit = () + def vacuum(tables: SQL.Tables = SQL.Tables.empty): Unit def now(): Date /* types */ def sql_type(T: Type.Value): Source /* connection */ def connection: Connection def sqlite_connection: Option[JDBC4Connection] = connection match { case conn: JDBC4Connection => Some(conn) case _ => None } def postgresql_connection: Option[PGConnection] = connection match { case conn: PGConnection => Some(conn) case _ => None } def the_sqlite_connection: JDBC4Connection = sqlite_connection getOrElse error("SQLite connection expected, but found " + connection.getClass.getName) def the_postgresql_connection: PGConnection = postgresql_connection getOrElse error("PostgreSQL connection expected, but found " + connection.getClass.getName) def close(): Unit = connection.close() def transaction[A](body: => A): A = { val auto_commit = connection.getAutoCommit() try { connection.setAutoCommit(false) val savepoint = connection.setSavepoint() try { val result = body connection.commit() result } catch { case exn: Throwable => connection.rollback(savepoint); throw exn } } finally { connection.setAutoCommit(auto_commit) } } def transaction_lock[A](tables: Tables)(body: => A): A = transaction { tables.lock(db); body } def lock_tables(tables: List[Table]): Source = "" // PostgreSQL only /* statements and results */ def statement(sql: Source): Statement = new Statement(db, connection.prepareStatement(sql)) def using_statement[A](sql: Source)(f: Statement => A): A = using(statement(sql))(f) def execute_statement(sql: Source, body: Statement => Unit = _ => ()): Unit = using_statement(sql) { stmt => body(stmt); stmt.execute() } def execute_query_statement[A, B]( sql: Source, make_result: Iterator[A] => B, get: Result => A ): B = using_statement(sql)(stmt => make_result(stmt.execute_query().iterator(get))) def execute_query_statementO[A](sql: Source, get: Result => A): Option[A] = execute_query_statement[A, Option[A]](sql, _.nextOption, get) def execute_query_statementB(sql: Source): Boolean = using_statement(sql)(stmt => stmt.execute_query().next()) def update_date(stmt: Statement, i: Int, date: Date): Unit def date(res: Result, column: Column): Date def insert_permissive(table: Table, sql: Source = ""): Source /* tables and views */ def tables: List[String] = { val result = new mutable.ListBuffer[String] val rs = connection.getMetaData.getTables(null, null, "%", null) while (rs.next) { result += rs.getString(3) } result.toList } def create_table(table: Table, strict: Boolean = false, sql: Source = ""): Unit = execute_statement(table.create(strict, sql_type) + SQL.separate(sql)) def create_index(table: Table, name: String, columns: List[Column], strict: Boolean = false, unique: Boolean = false): Unit = execute_statement(table.create_index(name, columns, strict, unique)) def create_view(table: Table, strict: Boolean = false): Unit = { if (strict || !tables.contains(table.name)) { execute_statement("CREATE VIEW " + table + " AS " + { table.query; table.body }) } } } } /** SQLite **/ object SQLite { // see https://www.sqlite.org/lang_datefunc.html val date_format: Date.Format = Date.Format("uuuu-MM-dd HH:mm:ss.SSS x") lazy val init_jdbc: Unit = { val lib_path = Path.explode("$ISABELLE_SQLITE_HOME/" + Platform.jvm_platform) val lib_name = File.get_file(lib_path).file_name System.setProperty("org.sqlite.lib.path", File.platform_path(lib_path)) System.setProperty("org.sqlite.lib.name", lib_name) Class.forName("org.sqlite.JDBC") } def open_database(path: Path): Database = { init_jdbc val path0 = path.expand val s0 = File.platform_path(path0) val s1 = if (Platform.is_windows) s0.replace('\\', '/') else s0 val connection = DriverManager.getConnection("jdbc:sqlite:" + s1) new Database(path0.toString, connection) } class Database private[SQLite](name: String, val connection: Connection) extends SQL.Database { override def toString: String = name - override def rebuild(): Unit = execute_statement("VACUUM") + + override def vacuum(tables: SQL.Tables = SQL.Tables.empty): Unit = + execute_statement("VACUUM") // always FULL override def now(): Date = Date.now() def sql_type(T: SQL.Type.Value): SQL.Source = SQL.sql_type_sqlite(T) def update_date(stmt: SQL.Statement, i: Int, date: Date): Unit = if (date == null) stmt.string(i) = (null: String) else stmt.string(i) = date_format(date) def date(res: SQL.Result, column: SQL.Column): Date = proper_string(res.string(column)) match { case None => null case Some(s) => date_format.parse(s) } def insert_permissive(table: SQL.Table, sql: SQL.Source = ""): SQL.Source = table.insert_cmd(cmd = "INSERT OR IGNORE", sql = sql) } } /** PostgreSQL **/ object PostgreSQL { type Source = SQL.Source val default_port = 5432 lazy val init_jdbc: Unit = Class.forName("org.postgresql.Driver") def open_database( user: String, password: String, database: String = "", host: String = "", port: Int = 0, ssh: Option[SSH.Session] = None, ssh_close: Boolean = false, // see https://www.postgresql.org/docs/current/transaction-iso.html transaction_isolation: Int = Connection.TRANSACTION_SERIALIZABLE ): Database = { init_jdbc if (user == "") error("Undefined database user") val db_host = proper_string(host) getOrElse "localhost" val db_port = if (port > 0 && port != default_port) ":" + port else "" val db_name = "/" + (proper_string(database) getOrElse user) val (url, name, port_forwarding) = ssh match { case None => val spec = db_host + db_port + db_name val url = "jdbc:postgresql://" + spec val name = user + "@" + spec (url, name, None) case Some(ssh) => val fw = ssh.port_forwarding(remote_host = db_host, remote_port = if (port > 0) port else default_port, ssh_close = ssh_close) val url = "jdbc:postgresql://localhost:" + fw.port + db_name val name = user + "@" + fw + db_name + " via ssh " + ssh (url, name, Some(fw)) } try { val connection = DriverManager.getConnection(url, user, password) connection.setTransactionIsolation(transaction_isolation) new Database(name, connection, port_forwarding) } catch { case exn: Throwable => port_forwarding.foreach(_.close()); throw exn } } class Database private[PostgreSQL]( name: String, val connection: Connection, port_forwarding: Option[SSH.Port_Forwarding] ) extends SQL.Database { override def toString: String = name + override def vacuum(tables: SQL.Tables = SQL.Tables.empty): Unit = + execute_statement("VACUUM" + if_proper(tables.list, " " + commas(tables.list.map(_.ident)))) + override def now(): Date = { val now = SQL.Column.date("now") execute_query_statementO[Date]("SELECT NOW() as " + now.ident, res => res.date(now)) .getOrElse(error("Failed to get current date/time from database server " + toString)) } def sql_type(T: SQL.Type.Value): SQL.Source = SQL.sql_type_postgresql(T) // see https://jdbc.postgresql.org/documentation/head/8-date-time.html def update_date(stmt: SQL.Statement, i: Int, date: Date): Unit = if (date == null) stmt.rep.setObject(i, null) else stmt.rep.setObject(i, OffsetDateTime.from(date.to(Date.timezone_utc).rep)) def date(res: SQL.Result, column: SQL.Column): Date = { val obj = res.rep.getObject(column.name, classOf[OffsetDateTime]) if (obj == null) null else Date.instant(obj.toInstant) } def insert_permissive(table: SQL.Table, sql: SQL.Source = ""): SQL.Source = table.insert_cmd(sql = if_proper(sql, sql + " ") + "ON CONFLICT DO NOTHING") /* explicit locking: only applicable to PostgreSQL within transaction context */ // see https://www.postgresql.org/docs/current/sql-lock.html // see https://www.postgresql.org/docs/current/explicit-locking.html override def lock_tables(tables: List[SQL.Table]): PostgreSQL.Source = "LOCK TABLE " + tables.mkString(", ") + " IN ACCESS EXCLUSIVE MODE" /* notifications: IPC via database server */ // see https://www.postgresql.org/docs/current/sql-notify.html def listen(name: String): Unit = execute_statement("LISTEN " + SQL.ident(name)) def unlisten(name: String = "*"): Unit = execute_statement("UNLISTEN " + (if (name == "*") name else SQL.ident(name))) def notify(name: String, payload: String = ""): Unit = execute_statement("NOTIFY " + SQL.ident(name) + if_proper(payload, ", " + SQL.string(payload))) def get_notifications(): List[PGNotification] = the_postgresql_connection.getNotifications() match { case null => Nil case array => array.toList } override def close(): Unit = { super.close(); port_forwarding.foreach(_.close()) } } } diff --git a/src/Pure/Tools/build_process.scala b/src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala +++ b/src/Pure/Tools/build_process.scala @@ -1,1228 +1,1228 @@ /* Title: Pure/Tools/build_process.scala Author: Makarius Build process for sessions, with build database, optional heap, and optional presentation. */ package isabelle import scala.collection.immutable.SortedMap import scala.math.Ordering import scala.annotation.tailrec object Build_Process { /** static context **/ object Context { def apply( store: Sessions.Store, build_deps: Sessions.Deps, progress: Progress = new Progress, ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), hostname: String = Isabelle_System.hostname(), numa_shuffling: Boolean = false, build_heap: Boolean = false, max_jobs: Int = 1, fresh_build: Boolean = false, no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), build_uuid: String = UUID.random().toString, master: Boolean = false, ): Context = { val sessions_structure = build_deps.sessions_structure val build_graph = sessions_structure.build_graph val sessions = Map.from( for ((name, (info, _)) <- build_graph.iterator) yield { val deps = info.parent.toList val ancestors = sessions_structure.build_requirements(deps) val sources_shasum = build_deps.sources_shasum(name) val session_context = Build_Job.Session_Context.load( build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum, info.timeout, store, progress = progress) name -> session_context }) val sessions_time = { val maximals = build_graph.maximals.toSet def descendants_time(name: String): Double = { if (maximals.contains(name)) sessions(name).old_time.seconds else { val descendants = build_graph.all_succs(List(name)).toSet val g = build_graph.restrict(descendants) (0.0 :: g.maximals.flatMap { desc => val ps = g.all_preds(List(desc)) if (ps.exists(p => !sessions.isDefinedAt(p))) None else Some(ps.map(p => sessions(p).old_time.seconds).sum) }).max } } Map.from( for (name <- sessions.keysIterator) yield name -> descendants_time(name)).withDefaultValue(0.0) } val ordering = new Ordering[String] { def compare(name1: String, name2: String): Int = sessions_time(name2) compare sessions_time(name1) match { case 0 => sessions(name2).timeout compare sessions(name1).timeout match { case 0 => name1 compare name2 case ord => ord } case ord => ord } } val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes, build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, no_build = no_build, session_setup, build_uuid = build_uuid, master = master) } } final class Context private( val store: Sessions.Store, val build_deps: Sessions.Deps, val sessions: State.Sessions, val ordering: Ordering[String], val ml_platform: String, val hostname: String, val numa_nodes: List[Int], val build_heap: Boolean, val max_jobs: Int, val fresh_build: Boolean, val no_build: Boolean, val session_setup: (String, Session) => Unit, val build_uuid: String, val master: Boolean ) { override def toString: String = "Build_Process.Context(build_uuid = " + quote(build_uuid) + if_proper(master, ", master = true") + ")" def build_options: Options = store.options def sessions_structure: Sessions.Structure = build_deps.sessions_structure def sources_shasum(name: String): SHA1.Shasum = sessions(name).sources_shasum def old_command_timings(name: String): List[Properties.T] = sessions.get(name) match { case Some(session_context) => Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache) case None => Nil } def prepare_database(): Unit = { using_option(store.open_build_database()) { db => db.transaction { Data.all_tables.create_lock(db) Data.clean_build(db) } - db.rebuild() + db.vacuum(Data.all_tables) } } def store_heap(name: String): Boolean = build_heap || Sessions.is_pure(name) || sessions.valuesIterator.exists(_.ancestors.contains(name)) def worker_active: Boolean = max_jobs > 0 } /** dynamic state **/ type Progress_Messages = SortedMap[Long, Progress.Message] val progress_messages_empty: Progress_Messages = SortedMap.empty case class Build( build_uuid: String, ml_platform: String, options: String, start: Date, stop: Option[Date], progress_stopped: Boolean ) case class Worker( worker_uuid: String, build_uuid: String, hostname: String, java_pid: Long, java_start: Date, start: Date, stamp: Date, stop: Option[Date], serial: Long ) case class Task( name: String, deps: List[String], info: JSON.Object.T, build_uuid: String ) { def is_ready: Boolean = deps.isEmpty def resolve(dep: String): Task = if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this } case class Job( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, build: Option[Build_Job] ) extends Library.Named { def no_build: Job = copy(build = None) } case class Result( name: String, worker_uuid: String, build_uuid: String, node_info: Host.Node_Info, process_result: Process_Result, output_shasum: SHA1.Shasum, current: Boolean ) extends Library.Named { def ok: Boolean = process_result.ok } sealed case class Snapshot( progress_messages: Progress_Messages, builds: List[Build], // available build configurations workers: List[Worker], // available worker processes sessions: State.Sessions, // static build targets pending: State.Pending, // dynamic build "queue" running: State.Running, // presently running jobs results: State.Results) // finished results object State { type Sessions = Map[String, Build_Job.Session_Context] type Pending = List[Task] type Running = Map[String, Job] type Results = Map[String, Result] def inc_serial(serial: Long): Long = { require(serial < java.lang.Long.MAX_VALUE, "serial overflow") serial + 1 } } sealed case class State( serial: Long = 0, progress_seen: Long = 0, numa_next: Int = 0, sessions: State.Sessions = Map.empty, pending: State.Pending = Nil, running: State.Running = Map.empty, results: State.Results = Map.empty ) { require(serial >= 0, "serial underflow") def inc_serial: State = copy(serial = State.inc_serial(serial)) def set_serial(i: Long): State = { require(serial <= i, "non-monotonic change of serial") copy(serial = i) } def progress_serial(message_serial: Long = serial): State = if (message_serial > progress_seen) copy(progress_seen = message_serial) else error("Bad serial " + message_serial + " for progress output (already seen)") def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) = if (numa_nodes.isEmpty) (None, this) else { val available = numa_nodes.zipWithIndex val used = Set.from(for (job <- running.valuesIterator; i <- job.node_info.numa_node) yield i) val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0) val candidates = available.drop(numa_index) ::: available.take(numa_index) val (n, i) = candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head (Some(n), copy(numa_next = numa_nodes((i + 1) % numa_nodes.length))) } def finished: Boolean = pending.isEmpty def remove_pending(name: String): State = copy(pending = pending.flatMap( entry => if (entry.name == name) None else Some(entry.resolve(name)))) def is_running(name: String): Boolean = running.isDefinedAt(name) def stop_running(): Unit = for (job <- running.valuesIterator; build <- job.build) build.cancel() def finished_running(): List[Job] = List.from( for (job <- running.valuesIterator; build <- job.build if build.is_finished) yield job) def add_running(job: Job): State = copy(running = running + (job.name -> job)) def remove_running(name: String): State = copy(running = running - name) def make_result( result_name: (String, String, String), process_result: Process_Result, output_shasum: SHA1.Shasum, node_info: Host.Node_Info = Host.Node_Info.none, current: Boolean = false ): State = { val (name, worker_uuid, build_uuid) = result_name val result = Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) copy(results = results + (name -> result)) } } /** SQL data model **/ object Data { def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table = SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body) def pull_data[A <: Library.Named]( data_domain: Set[String], data_iterator: Set[String] => Iterator[A], old_data: Map[String, A] ): Map[String, A] = { val dom = data_domain -- old_data.keysIterator val data = old_data -- old_data.keysIterator.filterNot(dom) if (dom.isEmpty) data else data_iterator(dom).foldLeft(data) { case (map, a) => map + (a.name -> a) } } def pull0[A <: Library.Named]( new_data: Map[String, A], old_data: Map[String, A] ): Map[String, A] = { pull_data(new_data.keySet, dom => new_data.valuesIterator.filter(a => dom(a.name)), old_data) } def pull1[A <: Library.Named]( data_domain: Set[String], data_base: Set[String] => Map[String, A], old_data: Map[String, A] ): Map[String, A] = { pull_data(data_domain, dom => data_base(dom).valuesIterator, old_data) } object Generic { val build_uuid = SQL.Column.string("build_uuid") val worker_uuid = SQL.Column.string("worker_uuid") val name = SQL.Column.string("name") def sql( build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = SQL.and( if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)), if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)), if_proper(names, Generic.name.member(names))) def sql_where( build_uuid: String = "", worker_uuid: String = "", names: Iterable[String] = Nil ): SQL.Source = { SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names)) } } /* base table */ object Base { val build_uuid = Generic.build_uuid.make_primary_key val ml_platform = SQL.Column.string("ml_platform") val options = SQL.Column.string("options") val start = SQL.Column.date("start") val stop = SQL.Column.date("stop") val progress_stopped = SQL.Column.bool("progress_stopped") val table = make_table("", List(build_uuid, ml_platform, options, start, stop, progress_stopped)) } def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] = db.execute_query_statement( Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)), List.from[Build], { res => val build_uuid = res.string(Base.build_uuid) val ml_platform = res.string(Base.ml_platform) val options = res.string(Base.options) val start = res.date(Base.start) val stop = res.get_date(Base.stop) val progress_stopped = res.bool(Base.progress_stopped) Build(build_uuid, ml_platform, options, start, stop, progress_stopped) }) def start_build( db: SQL.Database, build_uuid: String, ml_platform: String, options: String, progress_stopped: Boolean ): Unit = { db.execute_statement(Base.table.insert(), body = { stmt => stmt.string(1) = build_uuid stmt.string(2) = ml_platform stmt.string(3) = options stmt.date(4) = db.now() stmt.date(5) = None stmt.bool(6) = progress_stopped }) } def stop_build(db: SQL.Database, build_uuid: String): Unit = db.execute_statement( Base.table.update(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), body = { stmt => stmt.date(1) = db.now() }) def clean_build(db: SQL.Database): Unit = { val old = db.execute_query_statement( Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.defined)), List.from[String], res => res.string(Base.build_uuid)) if (old.nonEmpty) { for (table <- build_uuid_tables) { db.execute_statement(table.delete(sql = Generic.build_uuid.where_member(old))) } } } /* sessions */ object Sessions { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val ancestors = SQL.Column.string("ancestors") val options = SQL.Column.string("options") val sources = SQL.Column.string("sources") val timeout = SQL.Column.long("timeout") val old_time = SQL.Column.long("old_time") val old_command_timings = SQL.Column.bytes("old_command_timings") val build_uuid = Generic.build_uuid val table = make_table("sessions", List(name, deps, ancestors, options, sources, timeout, old_time, old_command_timings, build_uuid)) } def read_sessions_domain(db: SQL.Database): Set[String] = db.execute_query_statement( Sessions.table.select(List(Sessions.name)), Set.from[String], res => res.string(Sessions.name)) def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions = db.execute_query_statement( Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))), Map.from[String, Build_Job.Session_Context], { res => val name = res.string(Sessions.name) val deps = split_lines(res.string(Sessions.deps)) val ancestors = split_lines(res.string(Sessions.ancestors)) val options = res.string(Sessions.options) val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) val timeout = Time.ms(res.long(Sessions.timeout)) val old_time = Time.ms(res.long(Sessions.old_time)) val old_command_timings_blob = res.bytes(Sessions.old_command_timings) val build_uuid = res.string(Sessions.build_uuid) name -> Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, timeout, old_time, old_command_timings_blob, build_uuid) } ) def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = { val old_sessions = read_sessions_domain(db) val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList for ((name, session) <- insert) { db.execute_statement(Sessions.table.insert(), body = { stmt => stmt.string(1) = name stmt.string(2) = cat_lines(session.deps) stmt.string(3) = cat_lines(session.ancestors) stmt.string(4) = session.session_prefs stmt.string(5) = session.sources_shasum.toString stmt.long(6) = session.timeout.ms stmt.long(7) = session.old_time.ms stmt.bytes(8) = session.old_command_timings_blob stmt.string(9) = session.build_uuid }) } insert.nonEmpty } /* progress */ object Progress { val serial = SQL.Column.long("serial").make_primary_key val kind = SQL.Column.int("kind") val text = SQL.Column.string("text") val verbose = SQL.Column.bool("verbose") val build_uuid = Generic.build_uuid val table = make_table("progress", List(serial, kind, text, verbose, build_uuid)) } def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages = db.execute_query_statement( Progress.table.select( sql = SQL.where( SQL.and( if (seen <= 0) "" else Progress.serial.ident + " > " + seen, Generic.sql(build_uuid = build_uuid)))), SortedMap.from[Long, isabelle.Progress.Message], { res => val serial = res.long(Progress.serial) val kind = isabelle.Progress.Kind(res.int(Progress.kind)) val text = res.string(Progress.text) val verbose = res.bool(Progress.verbose) serial -> isabelle.Progress.Message(kind, text, verbose = verbose) } ) def write_progress( db: SQL.Database, message_serial: Long, message: isabelle.Progress.Message, build_uuid: String ): Unit = { db.execute_statement(Progress.table.insert(), body = { stmt => stmt.long(1) = message_serial stmt.int(2) = message.kind.id stmt.string(3) = message.text stmt.bool(4) = message.verbose stmt.string(5) = build_uuid }) } def sync_progress( db: SQL.Database, seen: Long, build_uuid: String, build_progress: Progress ): (Progress_Messages, Boolean) = { require(build_uuid.nonEmpty) val messages = read_progress(db, seen = seen, build_uuid = build_uuid) val stopped_db = db.execute_query_statementO[Boolean]( Base.table.select(List(Base.progress_stopped), sql = SQL.where(Base.build_uuid.equal(build_uuid))), res => res.bool(Base.progress_stopped) ).getOrElse(false) def stop_db(): Unit = db.execute_statement( Base.table.update( List(Base.progress_stopped), sql = Base.build_uuid.where_equal(build_uuid)), body = { stmt => stmt.bool(1) = true }) val stopped = build_progress.stopped if (stopped_db && !stopped) build_progress.stop() if (stopped && !stopped_db) stop_db() (messages, messages.isEmpty && stopped_db == stopped) } /* workers */ object Workers { val worker_uuid = Generic.worker_uuid.make_primary_key val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val java_pid = SQL.Column.long("java_pid") val java_start = SQL.Column.date("java_start") val start = SQL.Column.date("start") val stamp = SQL.Column.date("stamp") val stop = SQL.Column.date("stop") val serial = SQL.Column.long("serial") val table = make_table("workers", List(worker_uuid, build_uuid, hostname, java_pid, java_start, start, stamp, stop, serial)) val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")") } def read_serial(db: SQL.Database): Long = db.execute_query_statementO[Long]( Workers.table.select(List(Workers.serial_max)), _.long(Workers.serial)).getOrElse(0L) def read_workers( db: SQL.Database, build_uuid: String = "", worker_uuid: String = "" ): List[Worker] = { db.execute_query_statement( Workers.table.select( sql = Generic.sql_where(build_uuid = build_uuid, worker_uuid = worker_uuid)), List.from[Worker], { res => Worker( worker_uuid = res.string(Workers.worker_uuid), build_uuid = res.string(Workers.build_uuid), hostname = res.string(Workers.hostname), java_pid = res.long(Workers.java_pid), java_start = res.date(Workers.java_start), start = res.date(Workers.start), stamp = res.date(Workers.stamp), stop = res.get_date(Workers.stop), serial = res.long(Workers.serial)) }) } def start_worker( db: SQL.Database, worker_uuid: String, build_uuid: String, hostname: String, java_pid: Long, java_start: Date, serial: Long ): Unit = { def err(msg: String): Nothing = error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) val build_stop = db.execute_query_statementO( Base.table.select(List(Base.stop), sql = Base.build_uuid.where_equal(build_uuid)), res => res.get_date(Base.stop)) build_stop match { case Some(None) => case Some(Some(_)) => err("for already stopped build process " + build_uuid) case None => err("for unknown build process " + build_uuid) } db.execute_statement(Workers.table.insert(), body = { stmt => val now = db.now() stmt.string(1) = worker_uuid stmt.string(2) = build_uuid stmt.string(3) = hostname stmt.long(4) = java_pid stmt.date(5) = java_start stmt.date(6) = now stmt.date(7) = now stmt.date(8) = None stmt.long(9) = serial }) } def stamp_worker( db: SQL.Database, worker_uuid: String, serial: Long, stop: Boolean = false ): Unit = { val sql = Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial), sql = Workers.worker_uuid.where_equal(worker_uuid)) db.execute_statement(sql, body = { stmt => val now = db.now() stmt.date(1) = now stmt.date(2) = if (stop) Some(now) else None stmt.long(3) = serial }) } /* pending jobs */ object Pending { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") val info = SQL.Column.string("info") val build_uuid = Generic.build_uuid val table = make_table("pending", List(name, deps, info, build_uuid)) } def read_pending(db: SQL.Database): List[Task] = db.execute_query_statement( Pending.table.select(sql = SQL.order_by(List(Pending.name))), List.from[Task], { res => val name = res.string(Pending.name) val deps = res.string(Pending.deps) val info = res.string(Pending.info) val build_uuid = res.string(Pending.build_uuid) Task(name, split_lines(deps), JSON.Object.parse(info), build_uuid) }) def update_pending(db: SQL.Database, pending: State.Pending): Boolean = { val old_pending = read_pending(db) val (delete, insert) = Library.symmetric_difference(old_pending, pending) if (delete.nonEmpty) { db.execute_statement( Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) } for (task <- insert) { db.execute_statement(Pending.table.insert(), body = { stmt => stmt.string(1) = task.name stmt.string(2) = cat_lines(task.deps) stmt.string(3) = JSON.Format(task.info) stmt.string(4) = task.build_uuid }) } delete.nonEmpty || insert.nonEmpty } /* running jobs */ object Running { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val table = make_table("running", List(name, worker_uuid, build_uuid, hostname, numa_node)) } def read_running(db: SQL.Database): State.Running = db.execute_query_statement( Running.table.select(sql = SQL.order_by(List(Running.name))), Map.from[String, Job], { res => val name = res.string(Running.name) val worker_uuid = res.string(Running.worker_uuid) val build_uuid = res.string(Running.build_uuid) val hostname = res.string(Running.hostname) val numa_node = res.get_int(Running.numa_node) name -> Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None) } ) def update_running(db: SQL.Database, running: State.Running): Boolean = { val running0 = read_running(db).valuesIterator.toList val running1 = running.valuesIterator.map(_.no_build).toList val (delete, insert) = Library.symmetric_difference(running0, running1) if (delete.nonEmpty) { db.execute_statement( Running.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) } for (job <- insert) { db.execute_statement(Running.table.insert(), body = { stmt => stmt.string(1) = job.name stmt.string(2) = job.worker_uuid stmt.string(3) = job.build_uuid stmt.string(4) = job.node_info.hostname stmt.int(5) = job.node_info.numa_node }) } delete.nonEmpty || insert.nonEmpty } /* job results */ object Results { val name = Generic.name.make_primary_key val worker_uuid = Generic.worker_uuid val build_uuid = Generic.build_uuid val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.string("numa_node") val rc = SQL.Column.int("rc") val out = SQL.Column.string("out") val err = SQL.Column.string("err") val timing_elapsed = SQL.Column.long("timing_elapsed") val timing_cpu = SQL.Column.long("timing_cpu") val timing_gc = SQL.Column.long("timing_gc") val output_shasum = SQL.Column.string("output_shasum") val current = SQL.Column.bool("current") val table = make_table("results", List(name, worker_uuid, build_uuid, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current)) } def read_results_domain(db: SQL.Database): Set[String] = db.execute_query_statement( Results.table.select(List(Results.name)), Set.from[String], res => res.string(Results.name)) def read_results(db: SQL.Database, names: Iterable[String] = Nil): State.Results = db.execute_query_statement( Results.table.select(sql = if_proper(names, Results.name.where_member(names))), Map.from[String, Result], { res => val name = res.string(Results.name) val worker_uuid = res.string(Results.worker_uuid) val build_uuid = res.string(Results.build_uuid) val hostname = res.string(Results.hostname) val numa_node = res.get_int(Results.numa_node) val node_info = Host.Node_Info(hostname, numa_node) val rc = res.int(Results.rc) val out = res.string(Results.out) val err = res.string(Results.err) val timing = res.timing( Results.timing_elapsed, Results.timing_cpu, Results.timing_gc) val process_result = Process_Result(rc, out_lines = split_lines(out), err_lines = split_lines(err), timing = timing) val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum)) val current = res.bool(Results.current) name -> Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) } ) def update_results(db: SQL.Database, results: State.Results): Boolean = { val old_results = read_results_domain(db) val insert = results.valuesIterator.filterNot(res => old_results.contains(res.name)).toList for (result <- insert) { val process_result = result.process_result db.execute_statement(Results.table.insert(), body = { stmt => stmt.string(1) = result.name stmt.string(2) = result.worker_uuid stmt.string(3) = result.build_uuid stmt.string(4) = result.node_info.hostname stmt.int(5) = result.node_info.numa_node stmt.int(6) = process_result.rc stmt.string(7) = cat_lines(process_result.out_lines) stmt.string(8) = cat_lines(process_result.err_lines) stmt.long(9) = process_result.timing.elapsed.ms stmt.long(10) = process_result.timing.cpu.ms stmt.long(11) = process_result.timing.gc.ms stmt.string(12) = result.output_shasum.toString stmt.bool(13) = result.current }) } insert.nonEmpty } /* collective operations */ val all_tables: SQL.Tables = SQL.Tables( Base.table, Workers.table, Progress.table, Sessions.table, Pending.table, Running.table, Results.table, Host.Data.Node_Info.table) val build_uuid_tables = all_tables.filter(table => table.columns.exists(column => column.name == Generic.build_uuid.name)) def pull_database( db: SQL.Database, worker_uuid: String, hostname: String, state: State ): State = { val serial_db = read_serial(db) if (serial_db == state.serial) state else { val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) val numa_next = Host.Data.read_numa_next(db, hostname) val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions) val pending = read_pending(db) val running = pull0(read_running(db), state.running) val results = pull1(read_results_domain(db), read_results(db, _), state.results) state.copy(serial = serial, numa_next = numa_next, sessions = sessions, pending = pending, running = running, results = results) } } def update_database( db: SQL.Database, worker_uuid: String, build_uuid: String, hostname: String, state: State ): State = { val changed = List( update_sessions(db, state.sessions), update_pending(db, state.pending), update_running(db, state.running), update_results(db, state.results), Host.Data.update_numa_next(db, hostname, state.numa_next)) val serial0 = state.serial val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 stamp_worker(db, worker_uuid, serial) state.set_serial(serial) } } } /** main process **/ class Build_Process( protected final val build_context: Build_Process.Context, protected final val build_progress: Progress ) extends AutoCloseable { /* context */ protected final val store: Sessions.Store = build_context.store protected final val build_options: Options = store.options protected final val build_deps: Sessions.Deps = build_context.build_deps protected final val hostname: String = build_context.hostname protected final val build_uuid: String = build_context.build_uuid protected final val worker_uuid: String = UUID.random().toString override def toString: String = "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) + if_proper(build_context.master, ", master = true") + ")" /* global state: internal var vs. external database */ private var _state: Build_Process.State = Build_Process.State() private val _database: Option[SQL.Database] = store.open_build_database() def close(): Unit = synchronized { _database.foreach(_.close()) } protected def synchronized_database[A](body: => A): A = synchronized { _database match { case None => body case Some(db) => def pull_database(): Unit = { _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state) } def sync_database(): Unit = { _state = Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state) } def attempt(): Either[A, Build_Process.Progress_Messages] = { val (messages, sync) = Build_Process.Data.sync_progress( db, _state.progress_seen, build_uuid, build_progress) if (sync) Left { pull_database(); val res = body; sync_database(); res } else Right(messages) } @tailrec def attempts(): A = { db.transaction_lock(Build_Process.Data.all_tables) { attempt() } match { case Left(res) => res case Right(messages) => for ((message_serial, message) <- messages) { _state = _state.progress_serial(message_serial = message_serial) if (build_progress.do_output(message)) build_progress.output(message) } attempts() } } attempts() } } /* progress backed by database */ private def progress_output(message: Progress.Message, build_progress_output: => Unit): Unit = { synchronized_database { _state = _state.inc_serial.progress_serial() for (db <- _database) { Build_Process.Data.write_progress(db, _state.serial, message, build_uuid) Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial) } build_progress_output } } protected object progress extends Progress { override def verbose: Boolean = build_progress.verbose override def output(message: Progress.Message): Unit = progress_output(message, if (do_output(message)) build_progress.output(message)) override def theory(theory: Progress.Theory): Unit = progress_output(theory.message, build_progress.theory(theory)) override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = build_progress.nodes_status(nodes_status) override def stop(): Unit = build_progress.stop() override def stopped: Boolean = build_progress.stopped } protected val log: Logger = Logger.make_system_log(progress, build_options) /* policy operations */ protected def init_state(state: Build_Process.State): Build_Process.State = { val sessions1 = build_context.sessions.foldLeft(state.sessions) { case (map, (name, session)) => if (state.sessions.isDefinedAt(name)) map else map + (name -> session) } val old_pending = state.pending.iterator.map(_.name).toSet val new_pending = List.from( for { (name, session_context) <- build_context.sessions.iterator if !old_pending(name) } yield Build_Process.Task(name, session_context.deps, JSON.Object.empty, build_uuid)) val pending1 = new_pending ::: state.pending state.copy(sessions = sessions1, pending = pending1) } protected def next_job(state: Build_Process.State): Option[String] = if (progress.stopped || state.running.size < build_context.max_jobs) { state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name)) .sortBy(_.name)(build_context.ordering) .headOption.map(_.name) } else None protected def start_session(state: Build_Process.State, session_name: String): Build_Process.State = { val ancestor_results = for (a <- build_context.sessions(session_name).ancestors) yield state.results(a) val input_shasum = if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) val store_heap = build_context.store_heap(session_name) val (current, output_shasum) = store.check_output(session_name, sources_shasum = build_context.sources_shasum(session_name), input_shasum = input_shasum, fresh_build = build_context.fresh_build, store_heap = store_heap) val all_current = current && ancestor_results.forall(_.current) val result_name = (session_name, worker_uuid, build_uuid) if (all_current) { state .remove_pending(session_name) .make_result(result_name, Process_Result.ok, output_shasum, current = true) } else if (build_context.no_build) { progress.echo("Skipping " + session_name + " ...", verbose = true) state. remove_pending(session_name). make_result(result_name, Process_Result.error, output_shasum) } else if (progress.stopped || !ancestor_results.forall(_.ok)) { progress.echo(session_name + " CANCELLED") state .remove_pending(session_name) .make_result(result_name, Process_Result.undefined, output_shasum) } else { val (numa_node, state1) = state.next_numa_node(build_context.numa_nodes) val node_info = Host.Node_Info(hostname, numa_node) progress.echo( (if (store_heap) "Building " else "Running ") + session_name + if_proper(node_info.numa_node, " on " + node_info) + " ...") store.init_output(session_name) val build = Build_Job.start_session(build_context, progress, log, build_deps.background(session_name), input_shasum, node_info) val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build)) state1.add_running(job) } } /* build process roles */ final def is_session_name(job_name: String): Boolean = !Long_Name.is_qualified(job_name) protected final def start_build(): Unit = synchronized_database { for (db <- _database) { Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform, build_context.sessions_structure.session_prefs, progress.stopped) } } protected final def stop_build(): Unit = synchronized_database { for (db <- _database) { Build_Process.Data.stop_build(db, build_uuid) } } protected final def start_worker(): Unit = synchronized_database { for (db <- _database) { val java = ProcessHandle.current() val java_pid = java.pid val java_start = Date.instant(java.info.startInstant.get) _state = _state.inc_serial Build_Process.Data.start_worker( db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial) } } protected final def stop_worker(): Unit = synchronized_database { for (db <- _database) { Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true) } } /* run */ def run(): Map[String, Process_Result] = { if (build_context.master) synchronized_database { _state = init_state(_state) } def finished(): Boolean = synchronized_database { _state.finished } def sleep(): Unit = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_options.seconds("editor_input_delay").sleep() } def start_job(): Boolean = synchronized_database { next_job(_state) match { case Some(name) => if (is_session_name(name)) { _state = start_session(_state, name) true } else error("Unsupported build job name " + quote(name)) case None => false } } if (finished()) { progress.echo_warning("Nothing to build") Map.empty[String, Process_Result] } else { if (build_context.master) start_build() start_worker() if (build_context.master && !build_context.worker_active) { progress.echo("Waiting for external workers ...") } try { while (!finished()) { synchronized_database { if (progress.stopped) _state.stop_running() for (job <- _state.finished_running()) { val result_name = (job.name, worker_uuid, build_uuid) val (process_result, output_shasum) = job.build.get.join _state = _state. remove_pending(job.name). remove_running(job.name). make_result(result_name, process_result, output_shasum, node_info = job.node_info) } } if (!start_job()) sleep() } } finally { stop_worker() if (build_context.master) stop_build() } synchronized_database { for ((name, result) <- _state.results) yield name -> result.process_result } } } /* snapshot */ def snapshot(): Build_Process.Snapshot = synchronized_database { val (progress_messages, builds, workers) = _database match { case None => (Build_Process.progress_messages_empty, Nil, Nil) case Some(db) => (Build_Process.Data.read_progress(db), Build_Process.Data.read_builds(db), Build_Process.Data.read_workers(db)) } Build_Process.Snapshot( progress_messages = progress_messages, builds = builds, workers = workers, sessions = _state.sessions, pending = _state.pending, running = _state.running, results = _state.results) } }