diff --git a/src/Pure/Concurrent/future.scala b/src/Pure/Concurrent/future.scala --- a/src/Pure/Concurrent/future.scala +++ b/src/Pure/Concurrent/future.scala @@ -1,143 +1,161 @@ /* Title: Pure/Concurrent/future.scala Author: Makarius Value-oriented parallel execution via futures and promises. */ package isabelle import java.util.concurrent.Callable /* futures and promises */ object Future { def value[A](x: A): Future[A] = new Value_Future(x) def fork[A](body: => A): Future[A] = new Task_Future[A](body) def promise[A]: Promise[A] = new Promise_Future[A] - def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] = - new Thread_Future[A](name, daemon, body) + + def thread[A]( + name: String = "", + group: ThreadGroup = Standard_Thread.current_thread_group, + pri: Int = Thread.NORM_PRIORITY, + daemon: Boolean = false, + inherit_locals: Boolean = false, + uninterruptible: Boolean = false)(body: => A): Future[A] = + { + new Thread_Future[A](name, group, pri, daemon, inherit_locals, uninterruptible, body) + } } trait Future[A] { def peek: Option[Exn.Result[A]] def is_finished: Boolean = peek.isDefined def get_finished: A = { require(is_finished); Exn.release(peek.get) } def join_result: Exn.Result[A] def join: A = Exn.release(join_result) def map[B](f: A => B): Future[B] = Future.fork { f(join) } def cancel: Unit override def toString: String = peek match { case None => "" case Some(Exn.Exn(_)) => "" case Some(Exn.Res(x)) => x.toString } } trait Promise[A] extends Future[A] { def fulfill_result(res: Exn.Result[A]): Unit def fulfill(x: A): Unit } /* value future */ private class Value_Future[A](x: A) extends Future[A] { val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) def join_result: Exn.Result[A] = peek.get def cancel {} } /* task future via thread pool */ private class Task_Future[A](body: => A) extends Future[A] { private sealed abstract class Status private case object Ready extends Status private case class Running(thread: Thread) extends Status private case object Terminated extends Status private case class Finished(result: Exn.Result[A]) extends Status private val status = Synchronized[Status](Ready) def peek: Option[Exn.Result[A]] = status.value match { case Finished(result) => Some(result) case _ => None } private def try_run() { val do_run = status.change_result { case Ready => (true, Running(Thread.currentThread)) case st => (false, st) } if (do_run) { val result = Exn.capture(body) status.change(_ => Terminated) status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result)) } } private val task = Standard_Thread.pool.submit(new Callable[Unit] { def call = try_run() }) def join_result: Exn.Result[A] = { try_run() status.guarded_access { case st @ Finished(result) => Some((result, st)) case _ => None } } def cancel = { status.change { case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) case st @ Running(thread) => thread.interrupt; st case st => st } } } /* promise future */ private class Promise_Future[A] extends Promise[A] { private val state = Synchronized[Option[Exn.Result[A]]](None) def peek: Option[Exn.Result[A]] = state.value def join_result: Exn.Result[A] = state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) def fulfill_result(result: Exn.Result[A]): Unit = state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) def cancel: Unit = state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) } /* thread future */ -private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A] +private class Thread_Future[A]( + name: String, + group: ThreadGroup, + pri: Int, + daemon: Boolean, + inherit_locals: Boolean, + uninterruptible: Boolean, + body: => A) extends Future[A] { private val result = Future.promise[A] private val thread = - Standard_Thread.fork(name = name, daemon = daemon) { result.fulfill_result(Exn.capture(body)) } + Standard_Thread.fork(name = name, group = group, pri = pri, daemon = daemon, + inherit_locals = inherit_locals, uninterruptible = uninterruptible) + { result.fulfill_result(Exn.capture(body)) } def peek: Option[Exn.Result[A]] = result.peek def join_result: Exn.Result[A] = result.join_result def cancel: Unit = thread.interrupt } diff --git a/src/Pure/Concurrent/standard_thread.scala b/src/Pure/Concurrent/standard_thread.scala --- a/src/Pure/Concurrent/standard_thread.scala +++ b/src/Pure/Concurrent/standard_thread.scala @@ -1,166 +1,169 @@ /* Title: Pure/Concurrent/standard_thread.scala Author: Makarius Standard thread operations. */ package isabelle import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, ThreadFactory} object Standard_Thread { /* fork */ private val counter = Counter.make() def make_name(name: String = "", base: String = "thread"): String = proper_string(name).getOrElse(base + counter()) def current_thread_group: ThreadGroup = Thread.currentThread.getThreadGroup def fork( name: String = "", group: ThreadGroup = current_thread_group, pri: Int = Thread.NORM_PRIORITY, daemon: Boolean = false, - inherit_locals: Boolean = false)(body: => Unit): Standard_Thread = + inherit_locals: Boolean = false, + uninterruptible: Boolean = false)(body: => Unit): Standard_Thread = { - val main = new Runnable { override def run { body } } + val main = + if (uninterruptible) new Runnable { override def run { body } } + else new Runnable { override def run { Standard_Thread.uninterruptible { body } } } val thread = new Standard_Thread(main, name = make_name(name = name), group = group, pri = pri, daemon = daemon, inherit_locals = inherit_locals) thread.start thread } /* self */ def self: Standard_Thread = Thread.currentThread match { case thread: Standard_Thread => thread case _ => error("Expected to run on Isabelle/Scala standard thread") } def uninterruptible[A](body: => A): A = self.uninterruptible(body) /* pool */ lazy val pool: ThreadPoolExecutor = { val m = Value.Int.unapply(System.getProperty("isabelle.threads", "0")) getOrElse 0 val n = if (m > 0) m else (Runtime.getRuntime.availableProcessors max 1) min 8 val executor = new ThreadPoolExecutor(n, n, 2500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable]) executor.setThreadFactory( new Standard_Thread(_, name = make_name(base = "worker"), group = current_thread_group)) executor } /* delayed events */ final class Delay private[Standard_Thread]( first: Boolean, delay: => Time, log: Logger, event: => Unit) { private var running: Option[Event_Timer.Request] = None private def run: Unit = { val do_run = synchronized { if (running.isDefined) { running = None; true } else false } if (do_run) { try { event } catch { case exn: Throwable if !Exn.is_interrupt(exn) => log(Exn.message(exn)); throw exn } } } def invoke(): Unit = synchronized { val new_run = running match { case Some(request) => if (first) false else { request.cancel; true } case None => true } if (new_run) running = Some(Event_Timer.request(Time.now() + delay)(run)) } def revoke(): Unit = synchronized { running match { case Some(request) => request.cancel; running = None case None => } } def postpone(alt_delay: Time): Unit = synchronized { running match { case Some(request) => val alt_time = Time.now() + alt_delay if (request.time < alt_time && request.cancel) { running = Some(Event_Timer.request(alt_time)(run)) } case None => } } } // delayed event after first invocation def delay_first(delay: => Time, log: Logger = No_Logger)(event: => Unit): Delay = new Delay(true, delay, log, event) // delayed event after last invocation def delay_last(delay: => Time, log: Logger = No_Logger)(event: => Unit): Delay = new Delay(false, delay, log, event) } class Standard_Thread private( main: Runnable, name: String = "", group: ThreadGroup = null, pri: Int = Thread.NORM_PRIORITY, daemon: Boolean = false, inherit_locals: Boolean = false) extends Thread(group, null, name, 0L, inherit_locals) { thread => thread.setPriority(pri) thread.setDaemon(daemon) override def run { main.run() } private var interruptible: Boolean = true private var interrupt_pending: Boolean = false override def interrupt: Unit = synchronized { if (interruptible) super.interrupt() else { interrupt_pending = true } } def uninterruptible[A](body: => A): A = { require(Thread.currentThread == thread) val interruptible0 = synchronized { val b = interruptible; interruptible = false; b } try { body } finally { synchronized { interruptible = interruptible0 if (interruptible && interrupt_pending) { interrupt_pending = false super.interrupt() } } Exn.Interrupt.expose() } } }