diff --git a/src/Pure/Concurrent/future.ML b/src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML +++ b/src/Pure/Concurrent/future.ML @@ -1,717 +1,716 @@ (* Title: Pure/Concurrent/future.ML Author: Makarius Value-oriented parallel execution via futures and promises. *) signature FUTURE = sig type task = Task_Queue.task type group = Task_Queue.group val new_group: group option -> group val worker_task: unit -> task option val worker_group: unit -> group option val the_worker_group: unit -> group val worker_subgroup: unit -> group type 'a future val task_of: 'a future -> task val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool val ML_statistics: bool Unsynchronized.ref val interruptible_task: ('a -> 'b) -> 'a -> 'b val cancel_group: group -> unit val cancel: 'a future -> unit val error_message: Position.T -> (serial * string) * string option -> unit val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} val default_params: params val forks: params -> (unit -> 'a) list -> 'a future list val fork: (unit -> 'a) -> 'a future val get_finished: 'a future -> 'a val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val joins: 'a future list -> 'a list val join: 'a future -> 'a val forked_results: {name: string, deps: Task_Queue.task list} -> (unit -> 'a) list -> 'a Exn.result list val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b val enabled: unit -> bool val relevant: 'a list -> bool val proofs_enabled: int -> bool val proofs_enabled_timing: Time.time -> bool val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future val cond_forks: params -> (unit -> 'a) list -> 'a future list val map: ('a -> 'b) -> 'a future -> 'b future val promise_name: string -> (unit -> unit) -> 'a future val promise: (unit -> unit) -> 'a future val fulfill_result: 'a future -> 'a Exn.result -> unit val fulfill: 'a future -> 'a -> unit val snapshot: group list -> task list val shutdown: unit -> unit end; structure Future: FUTURE = struct (** future values **) type task = Task_Queue.task; type group = Task_Queue.group; val new_group = Task_Queue.new_group; (* identifiers *) local val worker_task_var = Thread_Data.var () : task Thread_Data.var; in fun worker_task () = Thread_Data.get worker_task_var; fun setmp_worker_task task f x = Thread_Data.setmp worker_task_var (SOME task) f x; end; val worker_group = Option.map Task_Queue.group_of_task o worker_task; fun the_worker_group () = (case worker_group () of SOME group => group | NONE => raise Fail "Missing worker thread context"); fun worker_subgroup () = new_group (worker_group ()); fun worker_joining e = (case worker_task () of NONE => e () | SOME task => Task_Queue.joining task e); fun worker_waiting deps e = (case worker_task () of NONE => e () | SOME task => Task_Queue.waiting task deps e); (* datatype future *) type 'a result = 'a Exn.result Single_Assignment.var; datatype 'a future = Value of 'a Exn.result | Future of {promised: bool, task: task, result: 'a result}; fun task_of (Value _) = Task_Queue.dummy_task | task_of (Future {task, ...}) = task; fun peek (Value res) = SOME res | peek (Future {result, ...}) = Single_Assignment.peek result; fun is_finished x = is_some (peek x); val _ = ML_system_pp (fn depth => fn pretty => fn x => (case peek x of NONE => PolyML.PrettyString "" | SOME (Exn.Exn _) => PolyML.PrettyString "" | SOME (Exn.Res y) => pretty (y, depth))); (** scheduling **) (* synchronization *) val scheduler_event = ConditionVar.conditionVar (); val work_available = ConditionVar.conditionVar (); val work_finished = ConditionVar.conditionVar (); local val lock = Mutex.mutex (); in fun SYNCHRONIZED name = Multithreading.synchronized name lock; fun wait cond = (*requires SYNCHRONIZED*) Multithreading.sync_wait NONE cond lock; fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) Multithreading.sync_wait (SOME (Time.now () + timeout)) cond lock; fun signal cond = (*requires SYNCHRONIZED*) ConditionVar.signal cond; fun broadcast cond = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; end; (* global state *) val queue = Unsynchronized.ref Task_Queue.empty; val next = Unsynchronized.ref 0; val scheduler = Unsynchronized.ref (NONE: Thread.thread option); val canceled = Unsynchronized.ref ([]: group list); val do_shutdown = Unsynchronized.ref false; val max_workers = Unsynchronized.ref 0; val max_active = Unsynchronized.ref 0; val status_ticks = Unsynchronized.ref 0; val last_round = Unsynchronized.ref Time.zeroTime; val next_round = seconds 0.05; datatype worker_state = Working | Waiting | Sleeping; val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); fun count_workers state = (*requires SYNCHRONIZED*) fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; (* status *) val ML_statistics = Unsynchronized.ref false; fun report_status () = (*requires SYNCHRONIZED*) if ! ML_statistics then let - val {ready, pending, running, passive, urgent, total} = Task_Queue.status (! queue); + val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue); val workers_total = length (! workers); val workers_active = count_workers Working; val workers_waiting = count_workers Waiting; - val stats = - [("now", Value.print_real (Time.toReal (Time.now ()))), - ("tasks_ready", Value.print_int ready), - ("tasks_pending", Value.print_int pending), - ("tasks_running", Value.print_int running), - ("tasks_passive", Value.print_int passive), - ("tasks_urgent", Value.print_int urgent), - ("tasks_total", Value.print_int total), - ("workers_total", Value.print_int workers_total), - ("workers_active", Value.print_int workers_active), - ("workers_waiting", Value.print_int workers_waiting)] @ - ML_Statistics.get (); + val _ = + ML_Statistics.set + {tasks_ready = ready, + tasks_pending = pending, + tasks_running = running, + tasks_passive = passive, + tasks_urgent = urgent, + workers_total = workers_total, + workers_active = workers_active, + workers_waiting = workers_waiting}; + val stats = ML_Statistics.get (); in Output.try_protocol_message (Markup.ML_statistics :: stats) [] end else (); (* cancellation primitives *) fun cancel_now group = (*requires SYNCHRONIZED*) let val running = Task_Queue.cancel (! queue) group; val _ = running |> List.app (fn thread => if Isabelle_Thread.is_self thread then () else Isabelle_Thread.interrupt_unsynchronized thread); in running end; fun cancel_all () = (*requires SYNCHRONIZED*) let val (groups, threads) = Task_Queue.cancel_all (! queue); val _ = List.app Isabelle_Thread.interrupt_unsynchronized threads; in groups end; fun cancel_later group = (*requires SYNCHRONIZED*) (Unsynchronized.change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); fun interruptible_task f x = Thread_Attributes.with_attributes (if is_some (worker_task ()) then Thread_Attributes.private_interrupts else Thread_Attributes.public_interrupts) (fn _ => f x) before Thread_Attributes.expose_interrupt (); (* worker threads *) fun worker_exec (task, jobs) = let val group = Task_Queue.group_of_task task; val valid = not (Task_Queue.is_canceled group); val ok = Task_Queue.running task (fn () => setmp_worker_task task (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) ()); val _ = if ! Multithreading.trace >= 2 then Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) [] else (); val _ = SYNCHRONIZED "finish" (fn () => let val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); val test = Exn.capture Thread_Attributes.expose_interrupt (); val _ = if ok andalso not (Exn.is_interrupt_exn test) then () else if null (cancel_now group) then () else cancel_later group; val _ = broadcast work_finished; val _ = if maximal then () else signal work_available; in () end); in () end; fun worker_wait worker_state cond = (*requires SYNCHRONIZED*) (case AList.lookup Thread.equal (! workers) (Thread.self ()) of SOME state => Unsynchronized.setmp state worker_state wait cond | NONE => wait cond); fun worker_next () = (*requires SYNCHRONIZED*) if length (! workers) > ! max_workers then (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); signal work_available; NONE) else let val urgent_only = count_workers Working > ! max_active in (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ()) urgent_only) of NONE => (worker_wait Sleeping work_available; worker_next ()) | some => (signal work_available; some)) end; fun worker_loop name = (case SYNCHRONIZED name (fn () => worker_next ()) of NONE => () | SOME work => (worker_exec work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) let val worker = Isabelle_Thread.fork {name = "worker", stack_limit = Isabelle_Thread.stack_limit (), interrupts = false} (fn () => worker_loop name); in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg); (* scheduler *) fun scheduler_next () = (*requires SYNCHRONIZED*) let val now = Time.now (); val tick = ! last_round + next_round <= now; val _ = if tick then last_round := now else (); (* runtime status *) val _ = if tick then Unsynchronized.change status_ticks (fn i => i + 1) else (); val _ = if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0 then report_status () else (); val _ = if not tick orelse forall (Thread.isActive o #1) (! workers) then () else let val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); val _ = workers := alive; in Multithreading.tracing 0 (fn () => "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads") end; (* worker pool adjustments *) val max_active0 = ! max_active; val max_workers0 = ! max_workers; val m = if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0 else Multithreading.max_threads (); val _ = max_active := m; val _ = max_workers := 2 * m; val missing = ! max_workers - length (! workers); val _ = if missing > 0 then funpow missing (fn () => ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () else (); val _ = if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () else signal work_available; (* canceled groups *) val _ = if null (! canceled) then () else (Multithreading.tracing 1 (fn () => string_of_int (length (! canceled)) ^ " canceled groups"); Unsynchronized.change canceled (filter_out (null o cancel_now)); signal work_available); (* delay loop *) val _ = Exn.release (wait_timeout next_round scheduler_event); (* shutdown *) val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else (report_status (); scheduler := NONE); val _ = broadcast scheduler_event; in continue end handle exn => if Exn.is_interrupt exn then (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt"); List.app cancel_later (cancel_all ()); signal work_available; true) else Exn.reraise exn; fun scheduler_loop () = (while Thread_Attributes.with_attributes (Thread_Attributes.sync_interrupts Thread_Attributes.public_interrupts) (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) do (); last_round := Time.zeroTime); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); fun scheduler_check () = (*requires SYNCHRONIZED*) (do_shutdown := false; if scheduler_active () then () else scheduler := SOME (Isabelle_Thread.fork {name = "scheduler", stack_limit = NONE, interrupts = false} scheduler_loop)); (** futures **) (* cancel *) fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*) let val _ = if null (cancel_now group) then () else cancel_later group; val _ = signal work_available; val _ = scheduler_check (); in () end; fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group); fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); (* results *) fun error_message pos ((serial, msg), exec_id) = Position.setmp_thread_data pos (fn () => let val id = Position.get_id pos in if is_none id orelse is_none exec_id orelse id = exec_id then Output.error_message' (serial, msg) else () end) (); fun identify_result pos res = res |> Exn.map_exn (fn exn => let val exec_id = (case Position.get_id pos of NONE => [] | SOME id => [(Markup.exec_idN, id)]) in Par_Exn.identify exec_id exn end); fun assign_result group result res = let val _ = Single_Assignment.assign result res handle exn as Fail _ => (case Single_Assignment.peek result of SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt e then e else exn) | _ => Exn.reraise exn); val ok = (case the (Single_Assignment.peek result) of Exn.Exn exn => (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false) | Exn.Res _ => true); in ok end; (* future jobs *) fun future_job group atts (e: unit -> 'a) = let val result = Single_Assignment.var "future" : 'a result; val pos = Position.thread_data (); val context = Context.get_generic_context (); fun job ok = let val res = if ok then Exn.capture (fn () => Thread_Attributes.with_attributes atts (fn _ => (Position.setmp_thread_data pos o Context.setmp_generic_context context) e ())) () else Exn.interrupt_exn; in assign_result group result (identify_result pos res) end; in (result, job) end; (* fork *) type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}; val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true}; fun forks ({name, group, deps, pri, interrupts}: params) es = if null es then [] else let val grp = (case group of NONE => worker_subgroup () | SOME grp => grp); fun enqueue e queue = let val atts = if interrupts then Thread_Attributes.private_interrupts else Thread_Attributes.no_interrupts; val (result, job) = future_job grp atts e; val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; val future = Future {promised = false, task = task, result = result}; in (future, queue') end; in SYNCHRONIZED "enqueue" (fn () => let val (futures, queue') = fold_map enqueue es (! queue); val _ = queue := queue'; val minimal = forall (not o Task_Queue.known_task queue') deps; val _ = if minimal then signal work_available else (); val _ = scheduler_check (); in futures end) end; fun fork e = (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e; (* join *) fun get_result x = (case peek x of NONE => Exn.Exn (Fail "Unfinished future") | SOME res => if Exn.is_interrupt_exn res then (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of [] => res | exns => Exn.Exn (Par_Exn.make exns)) else res); fun get_finished x = Exn.release (get_result x); local fun join_next atts deps = (*requires SYNCHRONIZED*) if null deps then NONE else (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of (NONE, []) => NONE | (NONE, deps') => (worker_waiting deps' (fn () => Thread_Attributes.with_attributes atts (fn _ => Exn.release (worker_wait Waiting work_finished))); join_next atts deps') | (SOME work, deps') => SOME (work, deps')); fun join_loop atts deps = (case SYNCHRONIZED "join" (fn () => join_next atts deps) of NONE => () | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps')); in fun join_results xs = let val _ = if forall is_finished xs then () else if is_some (worker_task ()) then Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn orig_atts => join_loop orig_atts (map task_of xs)) else xs |> List.app (fn Value _ => () | Future {result, ...} => ignore (Single_Assignment.await result)); in map get_result xs end; end; fun join_result x = singleton join_results x; fun joins xs = Par_Exn.release_all (join_results xs); fun join x = Exn.release (join_result x); (* forked results: nested parallel evaluation *) fun forked_results {name, deps} es = Thread_Attributes.uninterruptible (fn restore_attributes => fn () => let val (group, pri) = (case worker_task () of SOME task => (new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) | NONE => (new_group NONE, 0)); val futures = forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es; in restore_attributes join_results futures handle exn => (if Exn.is_interrupt exn then cancel_group group else (); Exn.reraise exn) end) (); (* task context for running thread *) fun task_context name group f x = Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn orig_atts => let val (result, job) = future_job group orig_atts (fn () => f x); val task = SYNCHRONIZED "enroll" (fn () => Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group)); val _ = worker_exec (task, [job]); in (case Single_Assignment.peek result of NONE => raise Fail "Missing task context result" | SOME res => Exn.release res) end); (* scheduling parameters *) fun enabled () = let val threads = Multithreading.max_threads () in threads > 1 andalso let val lim = threads * Options.default_int "parallel_limit" in lim <= 0 orelse SYNCHRONIZED "enabled" (fn () => Task_Queue.total_jobs (! queue) < lim) end end; val relevant = (fn [] => false | [_] => false | _ => enabled ()); fun proofs_enabled n = ! Multithreading.parallel_proofs >= n andalso is_some (worker_task ()) andalso enabled (); fun proofs_enabled_timing t = proofs_enabled 1 andalso Time.toReal t >= Options.default_real "parallel_subproofs_threshold"; (* fast-path operations -- bypass task queue if possible *) fun value_result (res: 'a Exn.result) = let val task = Task_Queue.dummy_task; val group = Task_Queue.group_of_task task; val result = Single_Assignment.var "value" : 'a result; val _ = assign_result group result (identify_result (Position.thread_data ()) res); in Future {promised = false, task = task, result = result} end; fun value x = value_result (Exn.Res x); fun cond_forks args es = if enabled () then forks args es else map (fn e => value_result (Exn.interruptible_capture e ())) es; fun map_future f x = if is_finished x then value_result (Exn.interruptible_capture (f o join) x) else let val task = task_of x; val group = Task_Queue.group_of_task task; val (result, job) = future_job group Thread_Attributes.private_interrupts (fn () => f (join x)); val extended = SYNCHRONIZED "extend" (fn () => (case Task_Queue.extend task job (! queue) of SOME queue' => (queue := queue'; true) | NONE => false)); in if extended then Future {promised = false, task = task, result = result} else (singleton o cond_forks) {name = "map_future", group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task, interrupts = true} (fn () => f (join x)) end; (* promised futures -- fulfilled by external means *) fun promise_name name abort : 'a future = let val group = worker_subgroup (); val result = Single_Assignment.var "promise" : 'a result; fun assign () = assign_result group result Exn.interrupt_exn handle Fail _ => true | exn => if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise" else Exn.reraise exn; fun job () = Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn _ => Exn.release (Exn.capture assign () before abort ())); val task = SYNCHRONIZED "enqueue_passive" (fn () => Unsynchronized.change_result queue (Task_Queue.enqueue_passive group name job)); in Future {promised = true, task = task, result = result} end; fun promise abort = promise_name "passive" abort; fun fulfill_result (Future {promised = true, task, result}) res = let val group = Task_Queue.group_of_task task; val pos = Position.thread_data (); fun job ok = assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn); val _ = Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn _ => let val passive_job = SYNCHRONIZED "fulfill_result" (fn () => Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); in (case passive_job of SOME true => worker_exec (task, [job]) | SOME false => () | NONE => ignore (job (not (Task_Queue.is_canceled group)))) end); val _ = if is_some (Single_Assignment.peek result) then () else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); in () end | fulfill_result _ _ = raise Fail "Not a promised future"; fun fulfill x res = fulfill_result x (Exn.Res res); (* snapshot: current tasks of groups *) fun snapshot [] = [] | snapshot groups = SYNCHRONIZED "snapshot" (fn () => Task_Queue.group_tasks (! queue) groups); (* shutdown *) fun shutdown () = if is_some (worker_task ()) then raise Fail "Cannot shutdown while running as worker thread" else SYNCHRONIZED "shutdown" (fn () => while scheduler_active () do (do_shutdown := true; Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); wait scheduler_event)); (*final declarations of this structure!*) val map = map_future; end; type 'a future = 'a Future.future; diff --git a/src/Pure/Concurrent/task_queue.ML b/src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML +++ b/src/Pure/Concurrent/task_queue.ML @@ -1,437 +1,436 @@ (* Title: Pure/Concurrent/task_queue.ML Author: Makarius Ordered queue of grouped tasks. *) signature TASK_QUEUE = sig type group val new_group: group option -> group val group_id: group -> int val eq_group: group * group -> bool val cancel_group: group -> exn -> unit val is_canceled: group -> bool val group_status: group -> exn list val str_of_group: group -> string val str_of_groups: group -> string val urgent_pri: int type task val dummy_task: task val group_of_task: task -> group val name_of_task: task -> string val pri_of_task: task -> int val eq_task: task * task -> bool val str_of_task: task -> string val str_of_task_groups: task -> string val task_statistics: task -> Properties.T val running: task -> (unit -> 'a) -> 'a val joining: task -> (unit -> 'a) -> 'a val waiting: task -> task list -> (unit -> 'a) -> 'a type queue val empty: queue val group_tasks: queue -> group list -> task list val known_task: queue -> task -> bool val all_passive: queue -> bool val total_jobs: queue -> int - val status: queue -> - {ready: int, pending: int, running: int, passive: int, urgent: int, total: int} + val status: queue -> {ready: int, pending: int, running: int, passive: int, urgent: int} val cancel: queue -> group -> Thread.thread list val cancel_all: queue -> group list * Thread.thread list val finish: task -> queue -> bool * queue val enroll: Thread.thread -> string -> group -> queue -> task * queue val enqueue_passive: group -> string -> (unit -> bool) -> queue -> task * queue val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue val dequeue: Thread.thread -> bool -> queue -> (task * (bool -> bool) list) option * queue val dequeue_deps: Thread.thread -> task list -> queue -> (((task * (bool -> bool) list) option * task list) * queue) end; structure Task_Queue: TASK_QUEUE = struct val new_id = Counter.make (); (** nested groups of tasks **) (* groups *) abstype group = Group of {parent: group option, id: int, status: exn option Unsynchronized.ref} with fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; fun new_group parent = make_group (parent, new_id (), Unsynchronized.ref NONE); fun group_id (Group {id, ...}) = id; fun eq_group (group1, group2) = group_id group1 = group_id group2; fun fold_groups f (g as Group {parent = NONE, ...}) a = f g a | fold_groups f (g as Group {parent = SOME group, ...}) a = fold_groups f group (f g a); (* group status *) local fun is_canceled_unsynchronized (Group {parent, status, ...}) = is_some (! status) orelse (case parent of NONE => false | SOME group => is_canceled_unsynchronized group); fun group_status_unsynchronized (Group {parent, status, ...}) = the_list (! status) @ (case parent of NONE => [] | SOME group => group_status_unsynchronized group); val lock = Mutex.mutex (); fun SYNCHRONIZED e = Multithreading.synchronized "group_status" lock e; in fun cancel_group (Group {status, ...}) exn = SYNCHRONIZED (fn () => Unsynchronized.change status (fn exns => SOME (Par_Exn.make (exn :: the_list exns)))); fun is_canceled group = SYNCHRONIZED (fn () => is_canceled_unsynchronized group); fun group_status group = SYNCHRONIZED (fn () => group_status_unsynchronized group); end; fun str_of_group group = (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); fun str_of_groups group = space_implode "/" (map str_of_group (rev (fold_groups cons group []))); end; (* tasks *) val urgent_pri = 1000; type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*) val timing_start = (Time.zeroTime, Time.zeroTime, []): timing; fun new_timing () = if ! Multithreading.trace < 2 then NONE else SOME (Synchronized.var "timing" timing_start); abstype task = Task of {group: group, name: string, id: int, pri: int option, timing: timing Synchronized.var option, pos: Position.T} with val dummy_task = Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = NONE, pos = Position.none}; fun new_task group name pri = Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing (), pos = Position.thread_data ()}; fun group_of_task (Task {group, ...}) = group; fun name_of_task (Task {name, ...}) = name; fun id_of_task (Task {id, ...}) = id; fun pri_of_task (Task {pri, ...}) = the_default 0 pri; fun eq_task (task1, task2) = id_of_task task1 = id_of_task task2; fun str_of_task (Task {name, id, ...}) = if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")"; fun str_of_task_groups task = str_of_task task ^ " in " ^ str_of_groups (group_of_task task); fun update_timing update (Task {timing, ...}) e = Thread_Attributes.uninterruptible (fn restore_attributes => fn () => let val start = Time.now (); val result = Exn.capture (restore_attributes e) (); val t = Time.now () - start; val _ = (case timing of NONE => () | SOME var => Synchronized.change var (update t)); in Exn.release result end) (); fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) = prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2)); fun task_statistics (Task {name, id, timing, pos, ...}) = let val (run, wait, wait_deps) = (case timing of NONE => timing_start | SOME var => Synchronized.value var); fun micros time = string_of_int (Time.toNanoseconds time div 1000); in [("now", Value.print_real (Time.toReal (Time.now ()))), ("task_name", name), ("task_id", Value.print_int id), ("run", micros run), ("wait", micros wait), ("wait_deps", commas wait_deps)] @ Position.properties_of pos end; end; structure Tasks = Table(type key = task val ord = task_ord); structure Task_Graph = Graph(type key = task val ord = task_ord); (* timing *) fun running task = update_timing (fn t => fn (a, b, ds) => (a + t, b, ds)) task; fun joining task = update_timing (fn t => fn (a, b, ds) => (a - t, b, ds)) task; fun waiting task deps = update_timing (fn t => fn (a, b, ds) => (a - t, b + t, if ! Multithreading.trace > 0 then fold (insert (op =) o name_of_task) deps ds else ds)) task; (** queue of jobs and groups **) (* known group members *) type groups = unit Tasks.table Inttab.table; fun get_tasks (groups: groups) gid = the_default Tasks.empty (Inttab.lookup groups gid); fun add_task (gid, task) groups = Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups; fun del_task (gid, task) groups = let val tasks = Tasks.delete_safe task (get_tasks groups gid) in if Tasks.is_empty tasks then Inttab.delete_safe gid groups else Inttab.update (gid, tasks) groups end; (* job dependency graph *) datatype job = Job of (bool -> bool) list | Running of Thread.thread | Passive of unit -> bool; type jobs = job Task_Graph.T; fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task; fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs; fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; (* queue *) datatype queue = Queue of {groups: groups, jobs: jobs, urgent: int, total: int}; fun make_queue groups jobs urgent total = Queue {groups = groups, jobs = jobs, urgent = urgent, total = total}; val empty = make_queue Inttab.empty Task_Graph.empty 0 0; fun group_tasks (Queue {groups, ...}) gs = fold (fn g => fn tasks => Tasks.merge (op =) (tasks, get_tasks groups (group_id g))) gs Tasks.empty |> Tasks.keys; fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task; (* job status *) fun ready_job (task, (Job list, (deps, _))) = if Task_Graph.Keys.is_empty deps then SOME (task, rev list) else NONE | ready_job (task, (Passive abort, (deps, _))) = if Task_Graph.Keys.is_empty deps andalso is_canceled (group_of_task task) then SOME (task, [fn _ => abort ()]) else NONE | ready_job _ = NONE; fun ready_job_urgent false = ready_job | ready_job_urgent true = (fn entry as (task, _) => if pri_of_task task >= urgent_pri then ready_job entry else NONE); fun active_job (task, (Running _, _)) = SOME (task, []) | active_job arg = ready_job arg; fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs); fun total_jobs (Queue {total, ...}) = total; (* queue status *) -fun status (Queue {jobs, urgent, total, ...}) = +fun status (Queue {jobs, urgent, ...}) = let val (x, y, z, w) = Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) => (case job of Job _ => if Task_Graph.Keys.is_empty deps then (x + 1, y, z, w) else (x, y + 1, z, w) | Running _ => (x, y, z + 1, w) | Passive _ => (x, y, z, w + 1))) jobs (0, 0, 0, 0); - in {ready = x, pending = y, running = z, passive = w, urgent = urgent, total = total} end; + in {ready = x, pending = y, running = z, passive = w, urgent = urgent} end; (** task queue operations **) (* cancel -- peers and sub-groups *) fun cancel (Queue {groups, jobs, ...}) group = let val _ = cancel_group group Exn.Interrupt; val running = Tasks.fold (fn (task, _) => (case get_job jobs task of Running thread => insert Thread.equal thread | _ => I)) (get_tasks groups (group_id group)) []; in running end; fun cancel_all (Queue {jobs, ...}) = let fun cancel_job (task, (job, _)) (groups, running) = let val group = group_of_task task; val _ = cancel_group group Exn.Interrupt; in (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) | _ => (groups, running)) end; val running = Task_Graph.fold cancel_job jobs ([], []); in running end; (* finish *) fun finish task (Queue {groups, jobs, urgent, total}) = let val group = group_of_task task; val groups' = fold_groups (fn g => del_task (group_id g, task)) group groups; val jobs' = Task_Graph.del_node task jobs; val total' = total - 1; val maximal = Task_Graph.is_maximal jobs task; in (maximal, make_queue groups' jobs' urgent total') end; (* enroll *) fun enroll thread name group (Queue {groups, jobs, urgent, total}) = let val task = new_task group name NONE; val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; val jobs' = jobs |> Task_Graph.new_node (task, Running thread); val total' = total + 1; in (task, make_queue groups' jobs' urgent total') end; (* enqueue *) fun enqueue_passive group name abort (Queue {groups, jobs, urgent, total}) = let val task = new_task group name NONE; val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; val jobs' = jobs |> Task_Graph.new_node (task, Passive abort); val total' = total + 1; in (task, make_queue groups' jobs' urgent total') end; fun enqueue name group deps pri job (Queue {groups, jobs, urgent, total}) = let val task = new_task group name (SOME pri); val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; val jobs' = jobs |> Task_Graph.new_node (task, Job [job]) |> fold (add_job task) deps; val urgent' = if pri >= urgent_pri then urgent + 1 else urgent; val total' = total + 1; in (task, make_queue groups' jobs' urgent' total') end; fun extend task job (Queue {groups, jobs, urgent, total}) = (case try (get_job jobs) task of SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) urgent total) | _ => NONE); (* dequeue *) fun dequeue_passive thread task (queue as Queue {groups, jobs, urgent, total}) = (case try (get_job jobs) task of SOME (Passive _) => let val jobs' = set_job task (Running thread) jobs in (SOME true, make_queue groups jobs' urgent total) end | SOME _ => (SOME false, queue) | NONE => (NONE, queue)); fun dequeue thread urgent_only (queue as Queue {groups, jobs, urgent, total}) = if not urgent_only orelse urgent > 0 then (case Task_Graph.get_first (ready_job_urgent urgent_only) jobs of SOME (result as (task, _)) => let val jobs' = set_job task (Running thread) jobs; val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent; in (SOME result, make_queue groups jobs' urgent' total) end | NONE => (NONE, queue)) else (NONE, queue); (* dequeue wrt. dynamic dependencies *) fun dequeue_deps thread deps (queue as Queue {groups, jobs, urgent, total}) = let fun ready [] rest = (NONE, rev rest) | ready (task :: tasks) rest = (case try (Task_Graph.get_entry jobs) task of NONE => ready tasks rest | SOME (_, entry) => (case ready_job (task, entry) of NONE => ready tasks (task :: rest) | some => (some, fold cons rest tasks))); fun ready_dep _ [] = NONE | ready_dep seen (task :: tasks) = if Tasks.defined seen task then ready_dep seen tasks else let val entry as (_, (ds, _)) = #2 (Task_Graph.get_entry jobs task) in (case ready_job (task, entry) of NONE => ready_dep (Tasks.update (task, ()) seen) (Task_Graph.Keys.dest ds @ tasks) | some => some) end; fun result (res as (task, _)) deps' = let val jobs' = set_job task (Running thread) jobs; val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent; in ((SOME res, deps'), make_queue groups jobs' urgent' total) end; in (case ready deps [] of (SOME res, deps') => result res deps' | (NONE, deps') => (case ready_dep Tasks.empty deps' of SOME res => result res deps' | NONE => ((NONE, deps'), queue))) end; (* toplevel pretty printing *) val _ = ML_system_pp (fn _ => fn _ => Pretty.to_polyml o Pretty.str o str_of_task); val _ = ML_system_pp (fn _ => fn _ => Pretty.to_polyml o Pretty.str o str_of_group); end; diff --git a/src/Pure/ML/ml_statistics.ML b/src/Pure/ML/ml_statistics.ML --- a/src/Pure/ML/ml_statistics.ML +++ b/src/Pure/ML/ml_statistics.ML @@ -1,111 +1,138 @@ (* Title: Pure/ML/ml_statistics.ML Author: Makarius ML runtime statistics. *) signature ML_STATISTICS = sig + val set: {tasks_ready: int, tasks_pending: int, tasks_running: int, tasks_passive: int, + tasks_urgent: int, workers_total: int, workers_active: int, workers_waiting: int} -> unit val get: unit -> (string * string) list val get_external: int -> (string * string) list val monitor: int -> real -> unit end; structure ML_Statistics: ML_STATISTICS = struct (* print *) fun print_int x = if x < 0 then "-" ^ Int.toString (~ x) else Int.toString x; fun print_real0 x = let val s = Real.fmt (StringCvt.GEN NONE) x in (case String.fields (fn c => c = #".") s of [a, b] => if List.all (fn c => c = #"0") (String.explode b) then a else s | _ => s) end; fun print_real x = if x < 0.0 then "-" ^ print_real0 (~ x) else print_real0 x; val print_properties = String.concatWith "," o map (fn (a, b) => a ^ "=" ^ b); -(* make properties *) +(* set user properties *) + +fun set {tasks_ready, tasks_pending, tasks_running, tasks_passive, tasks_urgent, + workers_total, workers_active, workers_waiting} = + (PolyML.Statistics.setUserCounter (0, tasks_ready); + PolyML.Statistics.setUserCounter (1, tasks_pending); + PolyML.Statistics.setUserCounter (2, tasks_running); + PolyML.Statistics.setUserCounter (3, tasks_passive); + PolyML.Statistics.setUserCounter (4, tasks_urgent); + PolyML.Statistics.setUserCounter (5, workers_total); + PolyML.Statistics.setUserCounter (6, workers_active); + PolyML.Statistics.setUserCounter (7, workers_waiting)); + + +(* get properties *) fun make_properties {gcFullGCs, gcPartialGCs, gcSharePasses, sizeAllocation, sizeAllocationFree, sizeCode, sizeHeap, sizeHeapFreeLastFullGC, sizeHeapFreeLastGC, sizeStacks, threadsInML, threadsTotal, threadsWaitCondVar, threadsWaitIO, threadsWaitMutex, threadsWaitSignal, timeGCReal, timeGCSystem, timeGCUser, timeNonGCReal, timeNonGCSystem, timeNonGCUser, userCounters} = let - val user_counters = - Vector.foldri - (fn (i, j, res) => ("user_counter" ^ print_int i, print_int j) :: res) - [] userCounters; + val tasks_ready = Vector.sub (userCounters, 0); + val tasks_pending = Vector.sub (userCounters, 1); + val tasks_running = Vector.sub (userCounters, 2); + val tasks_passive = Vector.sub (userCounters, 3); + val tasks_urgent = Vector.sub (userCounters, 4); + val tasks_total = tasks_ready + tasks_pending + tasks_running + tasks_passive + tasks_urgent; + val workers_total = Vector.sub (userCounters, 5); + val workers_active = Vector.sub (userCounters, 6); + val workers_waiting = Vector.sub (userCounters, 7); in - [("full_GCs", print_int gcFullGCs), + [("now", print_real (Time.toReal (Time.now ()))), + ("tasks_ready", print_int tasks_ready), + ("tasks_pending", print_int tasks_pending), + ("tasks_running", print_int tasks_running), + ("tasks_passive", print_int tasks_passive), + ("tasks_urgent", print_int tasks_urgent), + ("tasks_total", print_int tasks_total), + ("workers_total", print_int workers_total), + ("workers_active", print_int workers_active), + ("workers_waiting", print_int workers_waiting), + ("full_GCs", print_int gcFullGCs), ("partial_GCs", print_int gcPartialGCs), ("share_passes", print_int gcSharePasses), ("size_allocation", print_int sizeAllocation), ("size_allocation_free", print_int sizeAllocationFree), ("size_code", print_int sizeCode), ("size_heap", print_int sizeHeap), ("size_heap_free_last_full_GC", print_int sizeHeapFreeLastFullGC), ("size_heap_free_last_GC", print_int sizeHeapFreeLastGC), ("size_stacks", print_int sizeStacks), ("threads_in_ML", print_int threadsInML), ("threads_total", print_int threadsTotal), ("threads_wait_condvar", print_int threadsWaitCondVar), ("threads_wait_IO", print_int threadsWaitIO), ("threads_wait_mutex", print_int threadsWaitMutex), ("threads_wait_signal", print_int threadsWaitSignal), ("time_elapsed", print_real (Time.toReal timeNonGCReal)), ("time_elapsed_GC", print_real (Time.toReal timeGCReal)), ("time_CPU", print_real (Time.toReal timeNonGCSystem + Time.toReal timeNonGCUser)), - ("time_GC", print_real (Time.toReal timeGCSystem + Time.toReal timeGCUser))] @ - user_counters + ("time_GC", print_real (Time.toReal timeGCSystem + Time.toReal timeGCUser))] end; - -(* get properties *) - fun get () = make_properties (PolyML.Statistics.getLocalStats ()); fun get_external pid = make_properties (PolyML.Statistics.getRemoteStats pid); (* monitor process *) fun monitor pid delay = let fun loop () = (TextIO.output (TextIO.stdOut, print_properties (get_external pid) ^ "\n"); TextIO.flushOut TextIO.stdOut; OS.Process.sleep (Time.fromReal delay); loop ()); in loop () handle Interrupt => OS.Process.exit OS.Process.success end; end;