diff --git a/src/Pure/Concurrent/consumer_thread.scala b/src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala +++ b/src/Pure/Concurrent/consumer_thread.scala @@ -1,85 +1,102 @@ /* Title: Pure/Concurrent/consumer_thread.scala Author: Makarius Consumer thread with unbounded queueing of requests, and optional acknowledgment. */ package isabelle import scala.annotation.tailrec object Consumer_Thread { def fork[A](name: String = "", daemon: Boolean = false)( consume: A => Boolean, finish: () => Unit = () => ()): Consumer_Thread[A] = new Consumer_Thread[A](name, daemon, consume, finish) - - - /* internal messages */ - - private type Ack = Synchronized[Option[Exn.Result[Boolean]]] - private type Request[A] = (A, Option[Ack]) } final class Consumer_Thread[A] private( name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { + /* thread */ + private var active = true - private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] + private val mailbox = Mailbox[Option[Request]] private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) } def is_active: Boolean = active && thread.isAlive def check_thread: Boolean = Thread.currentThread == thread private def failure(exn: Throwable): Unit = Output.error_message( "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } - @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = + + /* requests */ + + private class Request(arg: A, acknowledge: Boolean = false) + { + private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = + if (acknowledge) Some(Synchronized(None)) else None + + def apply: Boolean = + { + val result = Exn.capture { consume(arg) } + (ack, result) match { + case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ()))) + case ((None, Exn.Res(_))) => + case ((None, Exn.Exn(exn))) => failure(exn) + } + result match { + case Exn.Res(continue) => continue + case Exn.Exn(_) => true + } + } + + def await + { + for (a <- ack) { + Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })) + } + } + } + + private def request(req: Request) + { + synchronized { + if (is_active) mailbox.send(Some(req)) + else error("Consumer thread not active: " + quote(thread.getName)) + } + req.await + } + + @tailrec private def main_loop(msgs: List[Option[Request]]): Unit = msgs match { case Nil => main_loop(mailbox.receive(None)) - case Some((arg, ack)) :: rest => - val result = Exn.capture { consume(arg) } - val continue = - result match { - case Exn.Res(cont) => cont - case Exn.Exn(exn) => - if (!ack.isDefined) failure(exn) - true - } - ack.foreach(a => a.change(_ => Some(result))) + case Some(req) :: rest => + val continue = req.apply if (continue) main_loop(rest) else robust_finish() case None :: _ => robust_finish() } - assert(is_active) - /* main methods */ - private def request(x: A, ack: Option[Consumer_Thread.Ack]) - { - synchronized { - if (is_active) mailbox.send(Some((x, ack))) - else error("Consumer thread not active: " + quote(thread.getName)) - } - ack.foreach(a => - Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))) - } + assert(is_active) - def send(arg: A) { request(arg, None) } - def send_wait(arg: A) { request(arg, Some(Synchronized(None))) } + def send(arg: A) { request(new Request(arg)) } + def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) } - def shutdown(): Unit = + def shutdown() { synchronized { if (is_active) { active = false; mailbox.send(None) } } thread.join } } diff --git a/src/Pure/General/exn.scala b/src/Pure/General/exn.scala --- a/src/Pure/General/exn.scala +++ b/src/Pure/General/exn.scala @@ -1,147 +1,153 @@ /* Title: Pure/General/exn.scala Author: Makarius Support for exceptions (arbitrary throwables). */ package isabelle object Exn { /* user errors */ class User_Error(message: String) extends RuntimeException(message) { override def equals(that: Any): Boolean = that match { case other: User_Error => message == other.getMessage case _ => false } override def hashCode: Int = message.hashCode override def toString: String = "\n" + Output.error_message_text(message) } object ERROR { def apply(message: String): User_Error = new User_Error(message) def unapply(exn: Throwable): Option[String] = user_message(exn) } def error(message: String): Nothing = throw ERROR(message) def cat_message(msgs: String*): String = cat_lines(msgs.iterator.filterNot(_ == "")) def cat_error(msgs: String*): Nothing = error(cat_message(msgs:_*)) /* exceptions as values */ sealed abstract class Result[A] { def user_error: Result[A] = this match { case Exn(ERROR(msg)) => Exn(ERROR(msg)) case _ => this } + + def map[B](f: A => B): Result[B] = + this match { + case Res(res) => Res(f(res)) + case Exn(exn) => Exn(exn) + } } case class Res[A](res: A) extends Result[A] case class Exn[A](exn: Throwable) extends Result[A] def capture[A](e: => A): Result[A] = try { Res(e) } catch { case exn: Throwable => Exn[A](exn) } def release[A](result: Result[A]): A = result match { case Res(x) => x case Exn(exn) => throw exn } def release_first[A](results: List[Result[A]]): List[A] = results.find({ case Exn(exn) => !is_interrupt(exn) case _ => false }) match { case Some(Exn(exn)) => throw exn case _ => results.map(release(_)) } /* interrupts */ def is_interrupt(exn: Throwable): Boolean = { var found_interrupt = false var e = exn while (!found_interrupt && e != null) { found_interrupt |= e.isInstanceOf[InterruptedException] e = e.getCause } found_interrupt } def interruptible_capture[A](e: => A): Result[A] = try { Res(e) } catch { case exn: Throwable if !is_interrupt(exn) => Exn[A](exn) } object Interrupt { def apply(): Throwable = new InterruptedException def unapply(exn: Throwable): Boolean = is_interrupt(exn) def expose() { if (Thread.interrupted) throw apply() } def impose() { Thread.currentThread.interrupt } def postpone[A](body: => A): Option[A] = { val interrupted = Thread.interrupted val result = capture { body } if (interrupted) impose() result match { case Res(x) => Some(x) case Exn(e) => if (is_interrupt(e)) { impose(); None } else throw e } } val return_code = 130 } /* POSIX return code */ def return_code(exn: Throwable, rc: Int): Int = if (is_interrupt(exn)) Interrupt.return_code else rc /* message */ def user_message(exn: Throwable): Option[String] = if (exn.getClass == classOf[RuntimeException] || exn.getClass == classOf[User_Error]) { Some(proper_string(exn.getMessage) getOrElse "Error") } else if (exn.isInstanceOf[java.sql.SQLException]) { Some(proper_string(exn.getMessage) getOrElse "SQL error") } else if (exn.isInstanceOf[java.io.IOException]) { val msg = exn.getMessage Some(if (msg == null || msg == "") "I/O error" else "I/O error: " + msg) } else if (exn.isInstanceOf[RuntimeException]) Some(exn.toString) else None def message(exn: Throwable): String = user_message(exn) getOrElse (if (is_interrupt(exn)) "Interrupt" else exn.toString) /* trace */ def trace(exn: Throwable): String = exn.getStackTrace.mkString("\n") }