diff --git a/basis/BasicStreamIO.sml b/basis/BasicStreamIO.sml index 8b786692..36a351aa 100644 --- a/basis/BasicStreamIO.sml +++ b/basis/BasicStreamIO.sml @@ -1,796 +1,796 @@ (* Title: Standard Basis Library: StreamIO functor Copyright David C.J. Matthews 2000, 2005, 2019 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) functor BasicStreamIO( structure PrimIO : PRIM_IO structure Vector : MONO_VECTOR structure Array : MONO_ARRAY structure VectorSlice: MONO_VECTOR_SLICE structure ArraySlice: MONO_ARRAY_SLICE sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector sharing type PrimIO.array = Array.array = ArraySlice.array sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice sharing type PrimIO.array_slice = ArraySlice.slice val someElem : PrimIO.elem ): sig include STREAM_IO (* Note: This is non-standard but enables us to define the derived BinIO and TextIO structures more efficiently. *) val outputVec: outstream * PrimIO.vector_slice -> unit end = struct open IO type vector = Vector.vector type elem = PrimIO.elem datatype reader = datatype PrimIO.reader datatype writer = datatype PrimIO.writer type array = Array.array type pos = PrimIO.pos exception Interrupt = RunCall.Interrupt (* Called after any exception in the lower level reader or writer to map any exception other than Io into Io. *) fun mapToIo (io as Io _, _, _) = io | mapToIo (Interrupt, _, _) = Interrupt | mapToIo (nonIo, name, caller) = Io { name = name, function = caller, cause = nonIo } val emptyVec = Vector.fromList [] (* Represents end-of-stream. *) datatype instream = (* The usual state of a stream: We may have to read from the reader before we have any real data or we may have already read. *) Uncommitted of { state: streamState ref, locker: Thread.Mutex.mutex } (* If we know we have unread input we can return this as the stream. That allows part of the stream to be read without locking. This is an optimisation. *) | Committed of { vec: vector, offset: int, rest: instream, startPos: pos option } and streamState = Truncated (* The stream has been closed or truncated. *) | HaveRead of (* A vector has been read from the stream. If the vector has size zero this is treated as EOF. startPos is the position when the vector was read. *) {vec: vector, rest: streamState ref, startPos: pos option } | ToRead of reader (* We have not yet closed or truncated the stream. *) (* Outstream. *) and outstream = OutStream of { wrtr: writer, buffType : IO.buffer_mode ref, buf: array, bufp: int ref, streamState: outstreamState ref, locker: Thread.Mutex.mutex } (* Stream state. OutStreamOpen means that attempts to write should proceed. OutStreamTerminated means the stream has been "terminated" i.e. buffers have been flushed and the writer has been extracted by getWriter. Any attempt to write at this level should fail. OutStreamClosed means that the writer's "close" function has been called. In addition the stream has been terminated.*) and outstreamState = OutStreamOpen | OutStreamTerminated | OutStreamClosed datatype out_pos = OutPos of outstream * pos (* Create a new stream from the vector and the reader. *) fun mkInstream (r, v: vector): instream = let val readRest = Uncommitted { state = ref (ToRead r), locker = Thread.Mutex.mutex() } (* If the vector is non-empty the first entry is as though the vector had been read otherwise it's just the reader. *) in if Vector.length v = 0 then readRest else Committed { vec = v, offset = 0, rest = readRest, startPos = NONE } end local fun input' (ref (HaveRead {vec, rest, ...}), locker) = let (* TODO: If we have already read further on we could convert these entries to Committed. *) in (vec, Uncommitted{ state = rest, locker = locker }) end | input' (s as ref Truncated, locker) = (* Truncated: return end-of-stream *) (emptyVec, Uncommitted{ state = s, locker = locker }) | input' (state as ref(readMore as ToRead (RD {chunkSize, readVec = SOME readVec, getPos, ...})), locker) = let (* We've not yet read this. Try reading from the reader. *) val startPos = case getPos of SOME g => SOME(g()) | NONE => NONE val data = readVec chunkSize (* Create a reference to the reader which will be updated by the next read. The ref is shared between the existing stream and the new one so reading on either adds to the same chain. *) val nextLink = ref readMore val nextChunk = HaveRead {vec = data, rest = nextLink, startPos = startPos} in (* Extend the stream by adding this vector to the list of chunks read so far. *) state := nextChunk; (* Return a new stream which continues reading. *) (data, Uncommitted { state = nextLink, locker = locker }) end | input' (ref(ToRead(RD{name, ...})), _) = (* readVec missing in reader. *) raise Io { name = name, function = "input", cause = BlockingNotSupported } fun inputNList' (ref (HaveRead {vec, rest, startPos}), locker, n) = let val vecLength = Vector.length vec in if vecLength = 0 (* End-of-stream: Return next in list. *) then ([vec], Uncommitted{ state = rest, locker = locker }) else if n < vecLength then (* We can use what's already been read. The stream we return allows us to read the rest without blocking. *) ([VectorSlice.vector(VectorSlice.slice(vec, 0, SOME n))], Committed{ vec = vec, offset = n, startPos = startPos, rest = Uncommitted{ state = rest, locker = locker} }) else if n = vecLength then (* Exactly uses up the buffer. New stream state is the next entry. *) ([vec], Uncommitted{ state = rest, locker = locker}) else (* Have to get the next item *) let val (nextVecs, nextStream) = inputNList' (rest, locker, n - vecLength) in (vec :: nextVecs, nextStream) end end | inputNList' (s as ref Truncated, locker, _) = (* Truncated: return end-of-stream *) ([emptyVec], Uncommitted{ state = s, locker = locker }) | inputNList' (f, locker, n) = (* ToRead *) let val (vec, f') = input' (f, locker) in if Vector.length vec = 0 then ([vec], f') (* Truncated or end-of-file. *) else inputNList' (f, locker, n) (* Reread *) end in fun input (Uncommitted { state, locker }) = LibraryIOSupport.protect locker input' (state, locker) | input (Committed { vec, offset, rest, ... }) = (* This stream was produced from re-reading a stream that already had data. We can return the result without the overhead of locking. *) (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)), rest) fun inputNList (Uncommitted { state, locker }, n) = LibraryIOSupport.protect locker inputNList' (state, locker, n) | inputNList (Committed { vec, offset, rest, startPos }, n) = let val vecLength = Vector.length vec in if vecLength = 0 (* End-of-stream: Return next in list. *) then ([vec], rest) else if n < vecLength - offset then (* We can use what's already been read. Next entry is a committed stream that returns the part we haven't yet used. *) ([VectorSlice.vector(VectorSlice.slice(vec, offset, SOME n))], Committed{ vec = vec, offset = offset+n, rest = rest, startPos = startPos }) else if n = vecLength - offset then (* Exactly uses up the buffer. New stream state is the next entry. *) ([VectorSlice.vector(VectorSlice.slice(vec, offset, NONE))], rest) else (* Have to get the next item *) let val (nextVecs, nextStream) = inputNList (rest, n - (vecLength - offset)) in (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)) :: nextVecs, nextStream) end end fun inputN (f, n) = if n < 0 then raise Size else if n = 0 (* Defined to return the empty vector and f *) then (emptyVec, f) else let val (vecs, f') = inputNList (f, n) in (Vector.concat vecs, f') end (* Read the whole of the remaining input until we get an EOF. *) fun inputAll f = let (* Find out the size of the file. *) fun getSize(n, f) = let val (v, f') = input f val vSize = Vector.length v in if vSize = 0 then n (* Reached EOF. *) else getSize (n + vSize, f') end in (* Read the whole file. *) inputN(f, getSize(0,f)) end (* Note a crucial difference between inputN and input1. Because input1 does not return a stream if it detects EOF it cannot advance beyond a temporary EOF in a stream. *) fun input1 (Committed { vec, offset, rest, startPos }) = let val vecSize = Vector.length vec in if vecSize = 0 then NONE else if vecSize = offset+1 then SOME(Vector.sub(vec, offset), rest) else SOME(Vector.sub(vec, offset), Committed{ vec = vec, offset = offset+1, rest = rest, startPos = startPos }) end | input1 f = let val (s, f') = inputN (f, 1) in if Vector.length s = 0 then NONE else SOME(Vector.sub(s, 0), f') end end local fun doClose (ref (HaveRead {rest, ...})) = doClose rest | doClose (ref Truncated) = () | doClose (state as ref (ToRead (RD{close, name, ...}))) = (state := Truncated; close() handle exn => raise mapToIo(exn, name, "closeIn")) in fun closeIn (Uncommitted { state, locker }) = LibraryIOSupport.protect locker doClose state | closeIn (Committed { rest, ...}) = closeIn rest end local (* Return the reader. *) fun getReader' (ref (HaveRead {rest, ...})) = getReader' rest | getReader' (ref Truncated) = raise Io { name = "", function = "getReader", cause = ClosedStream } | getReader' (state as ref (ToRead reader)) = (state := Truncated; reader) in fun getReader'' (Uncommitted { state, locker }) = LibraryIOSupport.protect locker getReader' state | getReader'' (Committed { rest, ... }) = getReader'' rest fun getReader f = let val reader = getReader'' f val (allInput, _) = inputAll f in (* Return the reader along with buffered input. It's not clear what to do if there are EOFs in the stream. The book says the result is the result of inputAll which takes everything up to the first EOF. *) (reader, allInput) end end local (* Check that the stream is not terminated and then convert a file position plus a vector offset into a file position. In particular, if the reader has converted CRNL into NL we don't have a simple relationship between elements and file offsets. *) fun findPosition'(startPos, offset, HaveRead {rest=ref rest, ...}) = findPosition'(startPos, offset, rest) | findPosition'(_, _, Truncated) = raise Io { name = "", function = "filePosIn", cause = ClosedStream } | findPosition'(startPos, offset, ToRead (RD { getPos = SOME getPos, setPos = SOME setPos, readVec = SOME readVec, ...})) = if offset = 0 then startPos (* Easy *) else (* When we read this vector we recorded the file position of the beginning only. To find the file position of the particular element we actually need to read the portion of the input up to that element and find out the file position at that point. *) let val savep = getPos() (* Save current position. *) (* Move to the point where we read the vector. *) val () = setPos startPos; (* Call readVec until we have read the required number of elements. N.B. Ganser & Reppy has a bug here. There is no guarantee that readVec n will actually return n elements so it's unsafe to assume that it will move the file pointer by n elements. *) fun doRead n = let val read = Vector.length(readVec n) in if read = n orelse read = 0 (* Error? *) then () else doRead (n - read) end (* Read the offset number of elements. *) val () = doRead offset; (* Record the position after actually reading the elements. *) val position = getPos(); in setPos savep; (* Restore. *) position end | findPosition'(_, _, ToRead _) = raise Io { name = "", function = "filePosIn", cause = RandomAccessNotSupported } fun findPosition(startPos, offset, Committed { rest, ... }) = findPosition(startPos, offset, rest) | findPosition(startPos, offset, Uncommitted { state = ref state, locker }) = LibraryIOSupport.protect locker findPosition' (startPos, offset, state) fun filePosIn' (HaveRead {rest=ref rest, startPos = SOME startPos, ...}) = findPosition'(startPos, 0, rest) | filePosIn' (HaveRead {startPos = NONE, ...}) = raise Io { name = "", function = "filePosIn", cause = RandomAccessNotSupported } | filePosIn' Truncated = raise Io { name = "", function = "filePosIn", cause = ClosedStream } | filePosIn' (ToRead(RD { getPos = SOME getPos, ...})) = getPos() | filePosIn' (ToRead _) = raise Io { name = "", function = "filePosIn", cause = RandomAccessNotSupported } in (* Find the first entry to get the position. *) fun filePosIn (Uncommitted { state = ref state, locker }) = LibraryIOSupport.protect locker filePosIn' state | filePosIn (Committed { offset, rest, startPos = SOME startPos, ... }) = findPosition(startPos, offset, rest) | filePosIn (Committed { startPos = NONE, ... }) = (* This can occur either because the reader doesn't support getPos or because the position is within the initial vector passed to mkInstream. *) raise Io { name = "", function = "filePosIn", cause = RandomAccessNotSupported } end local fun doCanInput' (ref (HaveRead {vec, rest, ...}), locker, n, k) = let val vecLength = Vector.length vec in if vecLength = 0 then SOME k else if vecLength >= n then SOME (k+n) else doCanInput'(rest, locker, n-vecLength, k+vecLength) end | doCanInput' (ref Truncated, _, _, k) = SOME k | doCanInput' (state as ref(readMore as ToRead (RD {chunkSize, readVecNB = SOME readVecNB, getPos, ...})), locker, n, k) = let val startPos = case getPos of SOME g => SOME(g()) | NONE => NONE in (* Read a block full. This will avoid us creating lots of small items in the list if there is actually plenty of input available. *) case readVecNB chunkSize of NONE => (* Reading these would block but we may already have some input. *) if k = 0 then NONE else SOME k | SOME data => let (* We have to record this in the stream. *) val nextLink = ref readMore val nextChunk = HaveRead {vec = data, rest = nextLink, startPos = startPos} in state := nextChunk; (* Check whether this has satisfied the request. *) doCanInput'(state, locker, n, k) end end | doCanInput' (ref(ToRead(RD {name, ...})), _, _, _) = (* readVecNB missing in reader. *) raise Io { name = name, function = "canInput", cause = NonblockingNotSupported } fun doCanInput (Uncommitted { state, locker }, n, k) = LibraryIOSupport.protect locker doCanInput' (state, locker, n, k) | doCanInput (Committed { vec, rest, ... }, n, k) = let val vecLength = Vector.length vec in if vecLength = 0 then SOME k (* Reached EOF. *) else if vecLength >= n then SOME (k + n) (* Have already read enough. *) else doCanInput(rest, n-vecLength, k+vecLength) end in fun canInput(f, n) = if n < 0 then raise Size else doCanInput(f, n, 0) end (* Look for end-of-stream. Could be defined more directly but it probably isn't worth it. *) fun endOfStream f = let val (v, _) = input f in Vector.length v = 0 end (* OUTPUT *) (* In order to be able to flush and close the streams when we exit we need to keep a list of the output streams. *) val ostreamLock = Thread.Mutex.mutex() - (* Use a no-overwrite ref for the list of streams. This ensures that - the ref will not be overwritten if we load a saved state. *) - val outputStreamList: outstream list ref = LibrarySupport.noOverwriteRef nil; + (* We use a volatile ref so that the list is always reset + at the start of a program. *) + val outputStreamList: outstream list ref = LibrarySupport.volatileListRef() fun protectOut f (outs as OutStream{locker, ...}) = LibraryIOSupport.protect locker f outs fun mkOutstream'(wrtr as WR{chunkSize, ...}, buffMode) = let open Thread.Mutex val strm = OutStream{wrtr=wrtr, buffType=ref buffMode, buf=Array.array(chunkSize, someElem), streamState=ref OutStreamOpen, bufp=ref 0, locker=Thread.Mutex.mutex()} in (* Add it to the list. *) outputStreamList := strm :: ! outputStreamList; strm end val mkOutstream = LibraryIOSupport.protect ostreamLock mkOutstream' fun getBufferMode(OutStream{buffType=ref b, ...}) = b local (* Flush anything from the buffer. *) fun flushOut'(OutStream{buf, bufp=bufp as ref endBuf, wrtr=wrtr as WR{name, ...}, ...}) = if endBuf = 0 then () (* Nothing buffered *) else case wrtr of WR{writeArr=SOME wa, ...} => let fun flushBuff n = let val written = wa(ArraySlice.slice(buf, n, SOME(endBuf-n))) handle exn => raise mapToIo(exn, name, "flushOut") in if written+n = endBuf then () else flushBuff(written+n) end in (* Set the buffer to empty BEFORE writing anything. If we get an asynchronous interrupt (ctrl-C) we want to lose data in preference to duplicating it. *) bufp := 0; flushBuff 0 end | _ => raise Io { name = name, function = "flushOut", cause = BlockingNotSupported } (* Terminate a stream either because it has been closed or because we have extracted the underlying writer. *) fun terminateStream'(f as OutStream{streamState as ref OutStreamOpen, ...}) = let (* outstream is not an equality type but we can get the desired effect by comparing the streamState references for equality (N.B. NOT their contents). *) fun removeThis(OutStream{streamState=streamState', ...}) = streamState' <> streamState open Thread.Mutex in streamState := OutStreamTerminated; lock ostreamLock; outputStreamList := List.filter removeThis (!outputStreamList); unlock ostreamLock; flushOut' f end | terminateStream' _ = () (* Nothing to do. *) (* Close the stream. We must call the writer's close function only once unless the flushing fails. In that case the stream is left open. *) fun closeOut'(OutStream{streamState=ref OutStreamClosed, ...}) = () | closeOut'(f as OutStream{wrtr=WR{close, name, ...}, streamState, ...}) = ( terminateStream' f; streamState := OutStreamClosed; close() handle exn => raise mapToIo(exn, name, "closeOut") (* Close the underlying writer. *) ) (* Flush the stream, terminate it and return the underlying writer. According to the documentation this raises an exception if the stream is "closed" rather than "terminated" implying that it is possible to extract the writer more than once. That's in contrast to getReader which is defined to raise an exception if the stream is closed or truncated. *) fun getWriter'(OutStream{wrtr=WR{name, ...}, streamState=ref OutStreamClosed, ...}) = (* Already closed. *) raise Io { name = name, function = "getWriter", cause = ClosedStream } | getWriter'(f as OutStream{buffType, wrtr, ...}) = ( terminateStream' f; (wrtr, !buffType) ) (* Set the buffer mode, possibly flushing the buffer as it does. *) fun setBufferMode' newBuff (f as OutStream{buffType, bufp, ...}) = (* Question: What if the stream is terminated? *) ( if newBuff = NO_BUF andalso !bufp <> 0 then (* Flush pending output. *) (* Switching from block to line buffering does not flush. *) flushOut' f else (); buffType := newBuff ) (* Internal function: Write a vector directly to the writer. It only returns when the vector has been completely written. "output" should work if the writer only provides writeArr so we may have to use that if writeVec isn't there. *) (* FOR TESTING: Put writeArr first. *) fun writeVec(OutStream{wrtr=WR{writeVec=SOME wv, name, ...}, ...}, v, i, len) = let fun forceOut p = let val written = wv(VectorSlice.slice(v, p+i, SOME(len-p))) handle exn => raise mapToIo(exn, name, "output") in if written+p = len then () else forceOut(written+p) end in forceOut 0 end | writeVec(OutStream{wrtr=WR{writeArr=SOME wa, name, ...}, ...}, v, i, len) = let val buffSize = 10 val buff = Array.array(buffSize, someElem); fun forceOut p = let val toCopy = Int.min(len-p, buffSize) val () = ArraySlice.copyVec{src=VectorSlice.slice(v, p+i, SOME toCopy), dst=buff, di=0} val written = wa(ArraySlice.slice(buff, 0, SOME toCopy)) handle exn => raise mapToIo(exn, name, "output") in if written+p = len then () else forceOut(written+p) end in forceOut 0 end | writeVec(OutStream{wrtr=WR{name, ...}, ...}, _, _, _) = raise Io { name = name, function = "output", cause = BlockingNotSupported } (* Internal function. Write a vector to the stream using the start and length provided. *) fun outputVector (v, start, vecLen) (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...}) = let val buffLen = Array.length buf fun arrayCopyVec{src: Vector.vector, si: int, len: int, dst: Array.array, di: int} = ArraySlice.copyVec{src=VectorSlice.slice(src, si, SOME len), dst=dst, di=di}; fun addVecToBuff () = if vecLen < buffLen - !bufp then (* Room in the buffer. *) ( arrayCopyVec{src=v, si=start, len=vecLen, dst=buf, di= !bufp}; bufp := !bufp + vecLen ) else let val buffSpace = buffLen - !bufp in (* Copy as much of the vector as will fit *) arrayCopyVec{src=v, si=start, len=buffSpace, dst=buf, di= !bufp}; bufp := !bufp+buffSpace; (* TODO: Flushing the buffer ensures that all the buffer contents have been written. We don't actually need that, what we need is for enough to have been written that we have space in the buffer for the rest of the vector. *) flushOut' f; (* Write it out. *) (* Copy the rest of the vector. *) arrayCopyVec{src=v, si=start+buffSpace, len=vecLen-buffSpace, dst=buf, di=0}; bufp := vecLen-buffSpace end (* addVecToBuff *) in if vecLen > buffLen then (* If the vector is too large to put in the buffer we're going to have to write something out. To reduce copying we simply flush the buffer and write the vector directly. *) (flushOut' f; writeVec(f, v, start, vecLen)) else (* Try copying to the buffer. *) if !buffType = IO.NO_BUF then (* Write it directly *) writeVec(f, v, start, vecLen) else (* Block or line buffering - add it to the buffer. Line buffering is treated as block buffering on binary streams and handled at the higher level for text streams. *) addVecToBuff() end (* State was not open *) | outputVector _ (OutStream{wrtr=WR{name, ...}, ...}) = raise Io { name = name, function = "output", cause = ClosedStream } (* This could be defined in terms of outputVector but this is likely to be much more efficient if we are buffering. *) fun output1' c (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...}) = if !buffType = IO.NO_BUF then writeVec(f, Vector.fromList[c], 0, 1) else (* Line or block buffering. *) ( Array.update(buf, !bufp, c); bufp := !bufp + 1; if !bufp = Array.length buf then flushOut' f else () ) (* State was not open *) | output1' _ (OutStream{wrtr=WR{name, ...}, ...}) = raise Io { name = name, function = "output1", cause = ClosedStream } fun getPosOut'(f as OutStream{wrtr=WR{name, getPos=SOME getPos, ...}, ...}) = ( flushOut' f; OutPos(f, getPos()) handle exn => raise mapToIo(exn, name, "getPosOut") ) | getPosOut'(OutStream{wrtr=WR{name, ...}, ...}) = raise Io { name = name, function = "getPosOut", cause = RandomAccessNotSupported } fun setPosOut' p (f as OutStream{wrtr=WR{setPos=SOME setPos, ...}, ...}) = ( flushOut' f; setPos p; f ) | setPosOut' _ (OutStream{wrtr=WR{name, ...}, ...}) = raise Io { name = name, function = "setPosOut", cause = RandomAccessNotSupported } in fun output1(f, c) = protectOut (output1' c) f fun output(f, v) = protectOut (outputVector(v, 0, Vector.length v)) f val flushOut = protectOut flushOut' val closeOut = protectOut closeOut' val getWriter = protectOut getWriter' fun setBufferMode(f, n) = protectOut (setBufferMode' n) f (* Exported function to output part of a vector. Non-standard. *) fun outputVec(f, slice) = let val (v, i, len) = VectorSlice.base slice in protectOut (outputVector(v, i, len)) f end val getPosOut = protectOut getPosOut' fun setPosOut(OutPos(f, p)) = protectOut (setPosOut' p) f end fun filePosOut(OutPos(_, p)) = p (* We need to set up a function to flush the streams when we exit. This has to be set up for every session so we set up an entry function, which is persistent, to do it. *) local fun closeAll () = (* Close all the streams. closeOut removes the streams from the list so we should end up with outputStreamList being nil. *) List.foldl (fn (s, ()) => closeOut s handle _ => ()) () (! outputStreamList) fun doOnEntry () = OS.Process.atExit closeAll in val () = PolyML.onEntry doOnEntry; val () = doOnEntry() (* Set it up for this session as well. *) end local open PolyML fun printWithName(s, name) = PolyML.PrettyString(String.concat[s, "-\"", String.toString name, "\""]) fun prettyIn depth a (Committed { rest, ...}) = prettyIn depth a rest | prettyIn _ _ (Uncommitted { state = ref streamState, ...}) = let fun prettyState Truncated = PolyML.PrettyString("Instream-truncated") | prettyState (HaveRead{ rest = ref rest, ...}) = prettyState rest | prettyState (ToRead(RD{name, ...})) = printWithName("Instream", name) in prettyState streamState end fun prettyOut _ _ (OutStream { wrtr = WR { name, ...}, ...}) = printWithName("Outstream", name) in val () = addPrettyPrinter prettyIn val () = addPrettyPrinter prettyOut end end; (* Define the StreamIO functor in terms of BasicStreamIO to filter out outputVec. *) (* This is non-standard. According to G&R 2004 StreamIO does not take the slice structures as args. *) functor StreamIO( structure PrimIO : PRIM_IO structure Vector : MONO_VECTOR structure Array : MONO_ARRAY structure VectorSlice: MONO_VECTOR_SLICE structure ArraySlice: MONO_ARRAY_SLICE sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector sharing type PrimIO.array = Array.array = ArraySlice.array sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice sharing type PrimIO.array_slice = ArraySlice.slice val someElem : PrimIO.elem ): STREAM_IO = struct structure StreamIO = BasicStreamIO(structure PrimIO = PrimIO and Vector = Vector and Array = Array and VectorSlice = VectorSlice and ArraySlice = ArraySlice val someElem = someElem) open StreamIO end; diff --git a/basis/ForeignMemory.sml b/basis/ForeignMemory.sml index 528aa103..08d4df06 100644 --- a/basis/ForeignMemory.sml +++ b/basis/ForeignMemory.sml @@ -1,237 +1,235 @@ (* Title: Foreign Function Interface: memory operations Author: David Matthews Copyright David Matthews 2015, 2017, 2020 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) structure ForeignMemory :> sig eqtype volatileRef val volatileRef: SysWord.word -> volatileRef val setVolatileRef: volatileRef * SysWord.word -> unit val getVolatileRef: volatileRef -> SysWord.word eqtype voidStar val voidStar2Sysword: voidStar -> SysWord.word val sysWord2VoidStar: SysWord.word -> voidStar val null: voidStar (* Helper functions to add a word value to an address. From 5.8.2 the word value is treated as signed. *) val ++ : voidStar * word -> voidStar val -- : voidStar * word -> voidStar (* Remember an address except across loads. *) val memoise: ('a -> voidStar) ->'a -> unit -> voidStar exception Memory (* malloc - allocate memory. N.B. argument is the number of bytes. Raises Memory exception if it cannot allocate. *) val malloc: word -> voidStar (* free - free allocated memory. *) val free: voidStar -> unit (* Load and store a value. From 5.8.2 the offset is treated as signed. *) val get8: voidStar * Word.word -> Word8.word val get16: voidStar * Word.word -> Word.word val get32: voidStar * Word.word -> Word32.word val get64: voidStar * Word.word -> SysWord.word val set8: voidStar * Word.word * Word8.word -> unit val set16: voidStar * Word.word * Word.word -> unit val set32: voidStar * Word.word * Word32.word -> unit val set64: voidStar * Word.word * SysWord.word -> unit val getFloat: voidStar * Word.word -> real val getDouble: voidStar * Word.word -> real val setFloat: voidStar * Word.word * real -> unit val setDouble: voidStar * Word.word * real -> unit val getAddress: voidStar * Word.word -> voidStar val setAddress: voidStar * Word.word * voidStar -> unit end = struct open ForeignConstants open ForeignMemory exception Foreign = RunCall.Foreign fun id x = x (* Internal utility function. *) fun alignUp(s, align) = Word.andb(s + align-0w1, ~ align) (* Both volatileRef and SysWord.word are the ADDRESSes of the actual value. *) type volatileRef = word ref val memMove: SysWord.word * SysWord.word * word * word* word -> unit = RunCall.moveBytes fun volatileRef init = let (* Allocate a single word marked as mutable, weak, no-overwrite, byte. *) (* A weak byte cell is cleared to zero when it is read in either from the executable or from a saved state. Using the no-overwrite bit ensures that if it is contained in the executable it won't be changed by loading a saved state but there's a problem if it is contained in a parent state. Then loading a child state will clear it because we reload all the parents when we load a child. *) val v = RunCall.allocateWordMemory(sysWordSize div wordSize, 0wx69, 0w0) (* Copy the SysWord into it. *) val () = memMove(init, RunCall.unsafeCast v, 0w0, 0w0, sysWordSize) in v end fun setVolatileRef(v, i) = memMove(i, RunCall.unsafeCast v, 0w0, 0w0, sysWordSize) fun getVolatileRef var = let (* Allocate a single word marked as mutable, byte. *) val v = RunCall.allocateByteMemory(sysWordSize div wordSize, 0wx41) val () = memMove(RunCall.unsafeCast var, v, 0w0, 0w0, sysWordSize) val () = RunCall.clearMutableBit v in v end type voidStar = SysWord.word val voidStar2Sysword = id and sysWord2VoidStar = id (* Exported conversions *) val null: voidStar = 0w0 infix 6 ++ -- (* Helper operations to add a constant to an address. These now treat the offset as signed so that adding ~1 is the same as subtracting 1. *) fun s ++ w = s + SysWord.fromLarge(Word.toLargeX w) and s -- w = s - SysWord.fromLarge(Word.toLargeX w) fun 'a memoise(f: 'a -> voidStar) (a: 'a) : unit -> voidStar = let (* Initialise to zero. That means the function won't be executed until we actually want the result. *) val v = volatileRef 0w0 in (* If we've reloaded the volatile ref it will have been reset to zero. We need to execute the function and set it. *) fn () => (case getVolatileRef v of 0w0 => let val r = f a in setVolatileRef(v, r); r end | r => r) end exception Memory (* Get and set addresses. This is a bit messy because it has to compile on 64-bits as well as 32-bits. *) val getAddress: voidStar * Word.word -> voidStar = if sysWordSize = 0w4 then Word32.toLargeWord o get32 else get64 val setAddress: voidStar * Word.word * voidStar -> unit = if sysWordSize = 0w4 then fn (s, i, v) => set32(s, i, Word32.fromLargeWord v) else set64 local local val ffiGeneralCall = RunCall.rtsCallFull2 "PolyFFIGeneral" in fun ffiGeneral(code: int, arg: 'a): 'b = RunCall.unsafeCast(ffiGeneralCall(RunCall.unsafeCast(code, arg))) end fun systemMalloc (s: word): voidStar = ffiGeneral (0, s) (*fun systemFree (s: voidStar): unit = ffiGeneral (1, s)*) (* Simple malloc/free implementation to reduce the number of RTS calls needed. *) val lock = Thread.Mutex.mutex() (* It would be possible to chain the free list in the C memory itself. For the moment we don't do that. The free list is the list of chunks ordered by increasing address. That allows us to merge adjacent free blocks. *) - val freeList: {address: SysWord.word, size: word} list ref = LibrarySupport.noOverwriteRef nil - (* Clear it once on entry. *) - val () = PolyML.onEntry (fn _ => freeList := nil) + val freeList: {address: SysWord.word, size: word} list ref = LibrarySupport.volatileListRef() (* Assume that if we align to the maximum of these we're all right. *) val maxAlign = Word.max(#align saDouble, Word.max(#align saPointer, #align saSint64)) (* We need a length word in each object we allocate but we need enough padding to align the result. *) val overhead = alignUp(sysWordSize, maxAlign) val chunkSize = 0w4096 (* Configure this. *) fun addFree(entry, []) = [entry] | addFree(entry, this :: rest) = if #address entry < #address this then ( if #address entry ++ #size entry = #address this then (* New entry is immediately before old one - merge. *) {address= #address entry, size = #size entry + #size this } :: rest else entry :: this :: rest ) else if #address this ++ #size this = #address entry then (* New entry is immediately after this - merge. Continue because it could also merge with an entry after this as well. *) addFree({address= #address this, size= #size entry + #size this}, rest) else this :: addFree(entry, rest) (* Search on. *) (* Find free space. *) fun findFree (_, []) = (NONE, []) | findFree (space, (this as {size, address}) :: tl) = if space = size then (SOME address, tl) else if space < size then (SOME address, {size=size-space, address=address ++ space} :: tl) else let val (res, rest) = findFree(space, tl) in (res, this :: rest) end fun freeMem s = let val addr = s -- overhead val size = Word.fromLarge(SysWord.toLarge(getAddress(addr, 0w0))) in freeList := addFree({address=addr, size=size}, !freeList) end fun allocMem s = let val space = alignUp(s + overhead, maxAlign) val (found, newList) = findFree(space, !freeList) in case found of NONE => let (* Need more memory *) val requestSpace = Word.max(chunkSize, space) val newSpace = systemMalloc requestSpace val _ = newSpace <> null orelse raise Memory in (* Add the space to the free list in the appropriate place. *) freeList := addFree({address=newSpace, size=requestSpace}, !freeList); allocMem s (* Repeat - should succeed now. *) end | SOME address => let val () = freeList := newList (* Update the free list *) (* Store the length in the first word. *) val () = setAddress(address, 0w0, SysWord.fromLarge(Word.toLarge space)) in address ++ overhead end end in val malloc: word -> voidStar = ThreadLib.protect lock allocMem fun free v = if v = null then () else ThreadLib.protect lock freeMem v end end; diff --git a/basis/ImperativeIO.sml b/basis/ImperativeIO.sml index 2834806a..43a6368f 100644 --- a/basis/ImperativeIO.sml +++ b/basis/ImperativeIO.sml @@ -1,171 +1,200 @@ (* Title: Standard Basis Library: ImperativeIO functor - Copyright David C.J. Matthews 2000, 2015 + Copyright David C.J. Matthews 2000, 2015, 2020 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) (* This is also used in TextIO. We need "protect". *) functor BasicImperativeIO ( structure StreamIO : STREAM_IO structure Vector : MONO_VECTOR structure Array : MONO_ARRAY sharing type StreamIO.elem = Vector.elem = Array.elem sharing type StreamIO.vector = Vector.vector = Array.vector ) (* No signature on the result *) = struct + open IO structure StreamIO = StreamIO type vector = Vector.vector and elem = StreamIO.elem datatype instream = InStream of { - (* An imperative input stream is a reference to a lazy functional stream. *) - fStream: StreamIO.instream ref, + (* An imperative input stream is a reference to a lazy functional stream. + It is an option ref because we use a volatile ref that is set to NONE + if this is exported and re-imported. *) + fStream: StreamIO.instream option ref, lock: Thread.Mutex.mutex } and outstream = OutStream of { (* An imperative output stream is a reference to the underlying stream. Unlike instream the underlying stream is also imperative but we need - a reference here to allow us to redirect. *) - fStream: StreamIO.outstream ref + a reference here to allow us to redirect. As with instream + this is a volatile ref. *) + fStream: StreamIO.outstream option ref } (* We don't need a mutex for outstream assuming := and ! are atomic i.e. '!' returns either the previous value or the current one and not some intermediate value. *) (* Use no-overwrite refs for imperative streams. This is really only needed for stdIn to make sure that when we call PolyML.SaveState.loadState we don't overwrite any unread input by the contents of the buffer when saveState was called. *) fun mkInstream (s : StreamIO.instream) : instream = - InStream{fStream = LibrarySupport.noOverwriteRef s, lock = Thread.Mutex.mutex()} + let + val r = LibrarySupport.volatileOptionRef () + val () = r := SOME s + in + InStream{fStream = r, lock = Thread.Mutex.mutex()} + end fun protect (InStream{fStream, lock}) f = LibraryIOSupport.protect lock f fStream (* Get and set the underlying stream. We have to interlock setInstream at least. *) - fun getInstream s = protect s (fn fStream => !fStream) + fun getInstream s = + protect s ( + fn ref (SOME stream) => stream | _ => raise Io { name = "", function = "getInstream", cause = ClosedStream } + ) and setInstream(InStream{fStream, lock}, s) = - LibraryIOSupport.protect lock (fn fStream => fStream := s) fStream + LibraryIOSupport.protect lock (fn fStream => fStream := SOME s) fStream (* These are just wrappers for the underlying functional calls. *) fun input s = protect s - (fn fStream => + (fn fStream as ref(SOME stream) => let - val (v, f') = StreamIO.input(!fStream) + val (v, f') = StreamIO.input stream in - fStream := f'; + fStream := SOME f'; v - end) + end + | _ => Vector.fromList[]) (* We don't use StreamIO.input1 here because that never advances over a temporary EOF. *) fun input1 s = protect s - (fn fStream => + (fn fStream as ref(SOME stream) => let - val (s, f') = StreamIO.inputN(!fStream, 1) + val (s, f') = StreamIO.inputN(stream, 1) in - fStream := f'; + fStream := SOME f'; if Vector.length s = 0 then NONE else SOME(Vector.sub(s, 0)) - end) + end + | _ => NONE) fun inputN(InStream{fStream, lock}, n) = LibraryIOSupport.protect lock - (fn fStream => + (fn fStream as ref(SOME stream) => let - val (v, f') = StreamIO.inputN(!fStream, n) + val (v, f') = StreamIO.inputN(stream, n) in - fStream := f'; + fStream := SOME f'; v - end) fStream + end + | _ => Vector.fromList[]) fStream fun inputAll s = protect s - (fn fStream => + (fn fStream as ref(SOME stream) => let - val (v, f') = StreamIO.inputAll(!fStream) + val (v, f') = StreamIO.inputAll stream in - fStream := f'; + fStream := SOME f'; v - end) + end + | _ => Vector.fromList[]) (* These next functions only query the stream and don't affect the fStream ref so don't really need interlocking. If two threads call these functions simultaneously the result is non-deterministic anyway. *) fun canInput(InStream{fStream, lock}, n) = LibraryIOSupport.protect lock - (fn fStream => StreamIO.canInput(! fStream, n)) fStream + (fn ref(SOME stream) => StreamIO.canInput(stream, n) | _ => SOME 0) fStream - and closeIn s = protect s (fn fStream => StreamIO.closeIn(! fStream)) - and endOfStream s = protect s (fn fStream => StreamIO.endOfStream(! fStream)) + and closeIn s = + protect s (fn ref(SOME stream) => StreamIO.closeIn stream | _ => ()) + and endOfStream s = + protect s (fn ref(SOME stream) => StreamIO.endOfStream stream | _ => true) fun lookahead s = protect s - (fn fStream => - case StreamIO.input1 (! fStream) of - NONE => NONE - | SOME(s, _) => SOME s + (fn ref(SOME stream) => + ( + case StreamIO.input1 stream of + NONE => NONE + | SOME(s, _) => SOME s + ) + | _ => NONE ) (* These are simply wrappers. *) fun mkOutstream (s : StreamIO.outstream) : outstream = - OutStream{fStream = LibrarySupport.noOverwriteRef s} + let + val r = LibrarySupport.volatileOptionRef() + val () = r := SOME s + in + OutStream{fStream = r} + end - fun getOutstream(OutStream{fStream = ref s}) = s - and setOutstream(OutStream{fStream}, s) = fStream := s + fun getOutstream(OutStream{fStream = ref(SOME s)}) = s + | getOutstream _ = raise Io { name = "", function = "getOutstream", cause = ClosedStream } + and setOutstream(OutStream{fStream}, s) = fStream := SOME s - fun output(OutStream{fStream=ref f, ...}, v) = StreamIO.output(f, v) - and output1(OutStream{fStream=ref f, ...}, c) = StreamIO.output1(f, c) - and flushOut(OutStream{fStream=ref f, ...}) = StreamIO.flushOut f - and closeOut(OutStream{fStream=ref f, ...}) = StreamIO.closeOut f - and getPosOut(OutStream{fStream=ref f, ...}) = StreamIO.getPosOut f + fun output(out, v) = StreamIO.output(getOutstream out, v) + and output1(out, c) = StreamIO.output1(getOutstream out, c) + and flushOut out = StreamIO.flushOut(getOutstream out) + and closeOut out = StreamIO.closeOut(getOutstream out) + and getPosOut out = StreamIO.getPosOut(getOutstream out) - fun setPosOut(OutStream{fStream, ...}, p) = fStream := StreamIO.setPosOut p + fun setPosOut(OutStream{fStream}, p) = fStream := SOME(StreamIO.setPosOut p) (* Add pretty printers to hide the internals. These just use the implementation streams. *) local open PolyML - fun prettyIn depth _ (InStream{ fStream = ref s, ...}) = + fun prettyIn depth _ (InStream{ fStream = ref(SOME s), ...}) = PolyML.prettyRepresentation(s, depth) - fun prettyOut depth _ (OutStream { fStream = ref s, ...}) = + | prettyIn _ _ _ = PolyML.PrettyString("Instream-closed") + fun prettyOut depth _ (OutStream { fStream = ref(SOME s), ...}) = PolyML.prettyRepresentation(s, depth) + | prettyOut _ _ _ = PolyML.PrettyString("Outstream-closed") in val () = addPrettyPrinter prettyIn val () = addPrettyPrinter prettyOut end end; (* General exported version with final signature. *) functor ImperativeIO ( structure StreamIO : STREAM_IO structure Vector : MONO_VECTOR structure Array : MONO_ARRAY sharing type StreamIO.elem = Vector.elem = Array.elem sharing type StreamIO.vector = Vector.vector = Array.vector ) : IMPERATIVE_IO where type StreamIO.elem = StreamIO.elem where type StreamIO.vector = StreamIO.vector where type StreamIO.instream = StreamIO.instream where type StreamIO.outstream = StreamIO.outstream where type StreamIO.out_pos = StreamIO.out_pos where type StreamIO.reader = StreamIO.reader where type StreamIO.writer = StreamIO.writer where type StreamIO.pos = StreamIO.pos = BasicImperativeIO(structure StreamIO = StreamIO and Vector = Vector and Array = Array); diff --git a/basis/LibrarySupport.sml b/basis/LibrarySupport.sml index 915d56b1..f9456c44 100644 --- a/basis/LibrarySupport.sml +++ b/basis/LibrarySupport.sml @@ -1,200 +1,207 @@ (* Title: Standard Basis Library: Support functions - Copyright David C.J. Matthews 2000, 2015-18 + Copyright David C.J. Matthews 2000, 2015-18, 2020 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) (* We need to execute these calls BEFORE compiling LibrarySupport if we want them to be compiled in as constants. *) structure MachineConstants = struct local val isBigEndian: unit -> bool = RunCall.rtsCallFast1 "PolyIsBigEndian" in val bigEndian : bool = isBigEndian () end val wordSize : word = RunCall.bytesPerWord (* This is the same as wordSize in native 32-bit and 64-bit but different in 32-in-64. *) val sysWordSize: word = RunCall.memoryCellLength(Word.toLargeWord 0w0) * wordSize end; structure LibrarySupport :> sig eqtype address (* eqtype so we can compare vectors. *) structure CharArray: sig datatype array = Array of word*address end structure Word8Array: sig datatype array = Array of word*address eqtype vector val wVecLength: vector -> word end val w8vectorToString: Word8Array.vector -> string and w8vectorFromString: string -> Word8Array.vector val wordSize: word and sysWordSize: word val bigEndian: bool val allocString: word -> string (* Create a mutable string. *) val allocBytes: word -> address val isShortInt : int -> bool val largeIntIsSmall: LargeInt.int -> bool val unsignedShortOrRaiseSubscript: int -> word val unsignedShortOrRaiseSize: int -> word val sizeAsWord : string -> word val stringAsAddress : string -> address val w8vectorAsAddress : Word8Array.vector -> address val maxAllocation: word and maxString: word - val noOverwriteRef: 'a -> 'a ref val emptyVector: word val quotRem: LargeInt.int*LargeInt.int -> LargeInt.int*LargeInt.int val getOSType: unit -> int + val volatileListRef: unit -> 'a list ref + val volatileWordRef: unit -> word ref + val volatileOptionRef: unit -> 'a option ref end = struct (* An address is the address of a vector in memory. *) type address = Bootstrap.byteArray (* This forces pointer equality. *) local (* Add a pretty printer to avoid crashes during debugging. *) open PolyML fun prettyAddress _ _ (_: address) = PolyML.PrettyString "byteArray" in val () = addPrettyPrinter prettyAddress end (* This is always a short non-negative integer so can be cast as word or int. *) fun sizeAsWord(s: string): word = RunCall.loadUntagged(s, 0w0) (* Provide the implementation of CharArray.array, Word8Array.array and Word8Array.vector (= Word8Vector.vector) here so that they are available to the IO routines. *) structure CharArray = struct datatype array = Array of word*address end structure Word8Array = struct (* Using the Array constructor here does not add any overhead since it is compiled as an identity function. *) datatype array = Array of word*address (* The representation of Word8Vector.vector is the same as that of string. We define it as "string" here so that it inherits the same equality function. The representation is assumed by the RTS. *) type vector = string val wVecLength: vector -> word = sizeAsWord end (* Identity functions to provide convertions. *) fun w8vectorToString s = s and w8vectorFromString s = s (* There are circumstances when we want to pass the address of a string where we expect an address. *) val stringAsAddress : string -> address = RunCall.unsafeCast val w8vectorAsAddress = stringAsAddress o w8vectorToString open MachineConstants; local val F_mutable_bytes : word = 0wx41 (* This is put in by Initialise and filtered out later. *) val setLengthWord: string * word -> unit = fn (s, n) => RunCall.storeUntagged(s, 0w0, n) val callGetAllocationSize = RunCall.rtsCallFast0 "PolyGetMaxAllocationSize" val callGetMaxStringSize = RunCall.rtsCallFast0 "PolyGetMaxStringSize" in (* Get the maximum allocation size. This is the maximum value that can fit in the length field of a segment. *) val maxAllocation = callGetAllocationSize() and maxString = callGetMaxStringSize() (* Check that we have a short int. This is only necessary if int is arbitrary precision. If int is fixed precision it will always be true. *) fun isShortInt(i: int): bool = not Bootstrap.intIsArbitraryPrecision orelse RunCall.isShort i (* Test whether a large int will fit in the short format. *) val largeIntIsSmall: LargeInt.int -> bool = RunCall.isShort fun unsignedShortOrRaiseSize (i: int): word = if isShortInt i andalso i >= 0 then RunCall.unsafeCast i else raise Size fun unsignedShortOrRaiseSubscript (i: int): word = if isShortInt i andalso i >= 0 then RunCall.unsafeCast i else raise Subscript fun allocBytes bytes : address = let val words : word = if bytes > maxString then raise Size (* The maximum string size is slightly smaller than the maximum array size because strings have a length word. It seems best to use the same maximum size for CharArray/Word8Array. *) else (bytes + wordSize - 0w1) div wordSize val mem = RunCall.allocateByteMemory(words, F_mutable_bytes) (* Zero the last word. *) val () = if words = 0w0 then () else RunCall.storeUntagged(RunCall.unsafeCast mem, words-0w1, 0w0) in mem end (* Allocate store for the string and set the first word to contain the length and the rest zero. *) fun allocString charsW = let (* The space is the number of characters plus space for the length word plus rounding. *) val words : word = (charsW + 0w2 * wordSize - 0w1) div wordSize val _ = words <= maxAllocation orelse raise Size val vec = RunCall.allocateByteMemory(words, F_mutable_bytes) (* Zero any extra bytes we've needed for rounding to a number of words. This isn't essential but ensures that RTS sharing passes will merge strings that are otherwise the same. *) val () = RunCall.storeUntagged(vec, words-0w1, 0w0) in (* Set the length word. Since this is untagged we can't simply use assign_word.*) setLengthWord(vec, charsW); vec end - (* Create non-overwritable mutables for mutexes and condition variables. - A non-overwritable mutable in the executable or a saved state is not - overwritten when a saved state further down the hierarchy is loaded. - This is also used for imperative streams, really only so that stdIn - works properly across SaveState.loadState calls. *) - fun noOverwriteRef (a: 'a) : 'a ref = RunCall.allocateWordMemory(0w1, 0wx48, a) + (* Volatile refs. They are cleared to 0/nil/NONE in an exported or saved state + and their current value is not written to a child state, unlike normal refs. + They are used for things like mutexes, condition variables and the list of + currently open streams which should always be reset. *) + local + fun volatileRef() : 'a ref = RunCall.allocateWordMemory(0w1, 0wx48, 0w0) + in + val volatileListRef = volatileRef + and volatileWordRef = volatileRef + and volatileOptionRef = volatileRef + end end (* Create an empty vector. This is used wherever we want an empty vector. It can't be 'a vector which is what we want because of the value restriction. *) val emptyVector: word = RunCall.allocateWordMemory(0w0, 0w0, 0w0) val quotRem = LargeInt.quotRem val getOSType: unit -> int = RunCall.rtsCallFast0 "PolyGetOSType" end; diff --git a/basis/OS.sml b/basis/OS.sml index 16ea0db1..63812529 100644 --- a/basis/OS.sml +++ b/basis/OS.sml @@ -1,1212 +1,1230 @@ (* Title: Standard Basis Library: OS Structures and Signatures Author: David Matthews Copyright David Matthews 2000, 2005, 2015-16 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) signature OS_FILE_SYS = sig type dirstream val openDir : string -> dirstream val readDir : dirstream -> string option val rewindDir : dirstream -> unit val closeDir : dirstream -> unit val chDir : string -> unit val getDir : unit -> string val mkDir : string -> unit val rmDir : string -> unit val isDir : string -> bool val isLink : string -> bool val readLink : string -> string val fullPath : string -> string val realPath : string -> string val modTime : string -> Time.time val fileSize : string -> Position.int val setTime : (string * Time.time Option.option) -> unit val remove : string -> unit val rename : {old : string, new : string} -> unit datatype access_mode = A_READ | A_WRITE | A_EXEC val access : (string * access_mode list) -> bool val tmpName : unit -> string eqtype file_id val fileId : string -> file_id val hash : file_id -> word val compare : (file_id * file_id) -> General.order end (* OS_FILE_SYS *); signature OS_PATH = sig exception Path exception InvalidArc val parentArc : string val currentArc : string val fromString : string -> { isAbs : bool, vol : string, arcs : string list } val toString : { isAbs : bool, vol : string, arcs : string list } -> string val validVolume : {isAbs : bool, vol : string} -> bool val getVolume : string -> string val getParent : string -> string val splitDirFile : string -> {dir : string, file : string} val joinDirFile : {dir : string, file : string} -> string val dir : string -> string val file : string -> string val splitBaseExt : string -> {base : string, ext : string option } val joinBaseExt : {base : string, ext : string option} -> string val base : string -> string val ext : string -> string option val mkCanonical : string -> string val isCanonical : string -> bool val mkAbsolute : {path : string, relativeTo : string} -> string val mkRelative : {path : string, relativeTo : string} -> string val isAbsolute : string -> bool val isRelative : string -> bool val isRoot : string -> bool val concat : string * string -> string val toUnixPath : string -> string val fromUnixPath : string -> string end (* OS_PATH *); signature OS_PROCESS = sig type status val success : status val failure : status val isSuccess : status -> bool val system : string -> status val atExit : (unit -> unit) -> unit val exit : status -> 'a val terminate : status -> 'a val getEnv : string -> string Option.option val sleep: Time.time -> unit end (* OS_PROCESS *); signature OS_IO = sig eqtype iodesc val hash : iodesc -> word val compare : (iodesc * iodesc) -> General.order eqtype iodesc_kind val kind : iodesc -> iodesc_kind structure Kind: sig val file : iodesc_kind val dir : iodesc_kind val symlink : iodesc_kind val tty : iodesc_kind val pipe : iodesc_kind val socket : iodesc_kind val device : iodesc_kind end eqtype poll_desc type poll_info val pollDesc : iodesc -> poll_desc Option.option val pollToIODesc : poll_desc -> iodesc exception Poll val pollIn : poll_desc -> poll_desc val pollOut : poll_desc -> poll_desc val pollPri : poll_desc -> poll_desc val poll : (poll_desc list * Time.time Option.option) -> poll_info list val isIn : poll_info -> bool val isOut : poll_info -> bool val isPri : poll_info -> bool val infoToPollDesc : poll_info -> poll_desc end (* OS_IO *); signature OS = sig eqtype syserror exception SysErr of (string * syserror Option.option) val errorMsg : syserror -> string val errorName : syserror -> string val syserror : string -> syserror Option.option structure FileSys : OS_FILE_SYS structure Path : OS_PATH structure Process : OS_PROCESS structure IO : OS_IO end (* OS *); structure OS:> OS = struct type syserror = SysWord.word (* Implemented as a SysWord.word value. *) (* The calls themselves raise the SysCall exception. That has to be turned into a SysError exception. *) exception SysErr = RunCall.SysErr (* Convert a numeric system error to a string. Note: unlike Posix.Error.errorName and Posix.Error.sysError the results are not defined other than that SOME e = syserror(errorName e) nor is this defined to be the same as the Posix.Error functions. Those are defined to return e.g. "etoobig". Here we return "E2BIG". *) val errorName: syserror -> string = RunCall.rtsCallFull1 "PolyProcessEnvErrorName" and errorMsg: syserror -> string = RunCall.rtsCallFull1 "PolyProcessEnvErrorMessage" local val doCall: string -> syserror = RunCall.rtsCallFull1 "PolyProcessEnvErrorFromString" in (* Convert a string to an error message if possible. *) fun syserror (s: string) : syserror option = let val n = doCall s in if n = 0w0 then NONE else SOME n end end structure Path:> OS_PATH = struct (* Note: The definition of relative and absolute paths are somewhat unclear and some of the examples seem contradictory. The definition I would prefer to use is that an absolute path is one which identifies a given file independent of any setting of the current directory. Hence the examples of "\" and "\A\B" as being absolute paths in DOS is in my opinion wrong. These are relative since they depend on the setting of the current volume. However this is a mess when it comes to fromString since if we don't treat "\A" as an absolute path it looks just like an absolute path with an empty arc. *) exception Path exception InvalidArc local val getOSCall: unit -> int = RunCall.rtsCallFast0 "PolyGetOSType" val getOS: int = getOSCall() in val isWindows = case getOS of 0 => false (* Posix *) | 1 => true | _ => raise Fail "Unknown operating system" end val isCaseSensitive = not isWindows val isSeparator = if isWindows then fn #"/" => true | #"\\" => true | _ => false else fn #"/" => true | _ => false val separator = if isWindows then "\\" else "/" val parentArc = ".." and currentArc = "." val isValidArc = if isWindows then let fun invalidChars #"\000" = true | invalidChars #"<" = true | invalidChars #">" = true | invalidChars #":" = true | invalidChars #"\"" = true | invalidChars #"\\" = true | invalidChars #"/" = true | invalidChars #"|" = true | invalidChars #"?" = true | invalidChars #"*" = true | invalidChars _ = false in not o (CharVector.exists invalidChars) end else let (* Posix - only null and / are invalid. *) fun invalidChars #"\000" = true | invalidChars #"/" = true | invalidChars _ = false in not o (CharVector.exists invalidChars) end local (* Given a string it examines the prefix and extracts the volume name if there is one. It returns the volume and also whether the name is absolute. It also returns the number of characters which matched so that this can be removed before treating the rest as a relative path. *) fun matchVolumePrefixPosix s = if String.size s > 0 andalso String.sub(s, 0) = #"/" then {volLen = 1, vol = "", abs = true, root = true } else {volLen = 0, vol = "", abs = false, root = false } fun matchVolumePrefixWindows s = let val slen = String.size s in if slen = 0 then { volLen = 0, vol = "", abs = false, root = false } else if slen >= 2 andalso String.sub(s, 1) = #":" andalso Char.isAlpha(String.sub(s, 0)) then if slen > 2 andalso isSeparator(String.sub(s, 2)) then { volLen = 3, vol = String.substring(s, 0, 2), abs = true, root = true } (* e.g. C:\ or C:\fred *) else { volLen = 2, vol = String.substring(s, 0, 2), abs = false, root = false } (* e.g. C: or C:fred *) else if slen > 2 andalso isSeparator(String.sub(s, 0)) andalso isSeparator(String.sub(s, 1)) then (* Looks like a UNC server name. See how big it is. *) let val (server, rest) = Substring.splitl(fn c => not (isSeparator c)) (Substring.extract(s, 2, NONE)) (* TODO: Is the server name actually valid? Assume yes. *) in if Substring.size rest = 0 then { volLen = 0, vol = "", abs = false, root = false } else (* Must be room for a share name as well. *) let val shareName = Substring.takel(fn c => not (isSeparator c)) (Substring.triml 1 rest) in { volLen = Substring.size server + Substring.size shareName + 4, vol = separator ^ separator ^ Substring.string server ^ separator ^ Substring.string shareName, abs = true, root = true } end end (* Leading \ in Windows means the "root" directory on the current drive. *) else if isSeparator(String.sub(s, 0)) then { volLen = 1, vol = "", abs = false, root = true } else { volLen = 0, vol = "", abs = false, root = false } end in val matchVolumePrefix = if isWindows then matchVolumePrefixWindows else matchVolumePrefixPosix end (* Internal - map the strings to the canonical case if they are not case sensitive. *) val toCanonicalCase = if isCaseSensitive then fn s => s else String.map Char.toLower (* Internal - are the arcs equivalent? *) fun equivalent (s, t) = toCanonicalCase s = toCanonicalCase t (* See if the volume name is valid for either an absolute or relative path. Windows relative paths may or may not have a volume but if they have the volume must look right. On Unix relative paths may not specify a volume and the only volume for absolute paths is the empty string. *) val validVolume = if isWindows then fn {isAbs, vol = ""} => not isAbs (* Empty volume is only valid for relative paths. *) | {vol, ...} => if size vol = 2 andalso String.sub(vol, 1) = #":" andalso Char.isAlpha(String.sub(vol, 0)) then true (* Drive letter e.g. C: *) else if size vol > 2 andalso isSeparator(String.sub(vol, 0)) then (* UNC name? \\server\share *) case String.fields isSeparator vol of ["", "", server, share] => server <> "" andalso share <> "" | _ => false else false else (* Posix. The volume must always be empty. *) fn {vol = "", ...} => true | _ => false (* We only return an empty arcs list if the argument is the empty string. *) fun fromString "" = {isAbs = false, vol = "", arcs=[]} | fromString (s: string) = let (* Do we have a volume name? *) val {volLen, vol, abs, root, ...} = matchVolumePrefix s (* The remainder forms a set of arcs. *) val rest = String.extract(s, volLen, NONE) val arcs = String.fields isSeparator rest (* If it begins with the Windows \ without a drive we need to add an extra empty arc. Otherwise we can't distinguish \a from a. *) val allArcs = if root andalso not abs then "" :: arcs else arcs in {isAbs = abs, vol = vol, arcs=allArcs} end (* Note: This is a mess as well. For example it says that it should raise Path if there is a relative path which begins with an empty arc. That's only true in Unix. What it should say is that it if isAbs is false then it should raise Path if the resulting path has the form of an absolute path. In Windows we should raise path if given (e.g.) {isAbs=false, vol="", arcs=["", "", "a", "b"]} because that looks like a UNC name. *) fun toString {isAbs : bool, vol : string, arcs : string list} = (* Check we have a valid volume. *) if not (validVolume{isAbs=isAbs, vol=vol}) then raise Path (* Check that each arc is valid. *) else if List.exists (fn s => not (isValidArc s)) arcs then raise InvalidArc else let (* Place separators between each arc. *) fun arcsToLinks [] = [] | arcsToLinks [a] = [a] | arcsToLinks (a::b) = a :: separator :: arcsToLinks b fun makePrefix(vol, false) = vol | makePrefix(vol, true) = vol ^ separator val r = String.concat(makePrefix(vol, isAbs) :: arcsToLinks arcs) (* Check to see whether we have turned a relative path into an absolute one by including empty arcs in the wrong places. *) val {abs = nowAbs, ...} = matchVolumePrefix r in if nowAbs <> isAbs then raise Path else r end (* Note: this is just defined to "return the volume portion" but doesn't say what to do if there isn't a volume. Seems simplest to define it as below. *) fun getVolume s = #vol(fromString s) (* Note: Once again this has very much a Unix view of the world, most of which almost works in Windows. I think the idea is that if possible it replaces the path with the path to the containing directory. If we're in the root directory we get the root directory. If we're in a path that ends with a component *) fun getParent "" = parentArc | getParent s = let val len = String.size s val {volLen, ...} = matchVolumePrefix s (* Split it at the last separator. *) val (prefix, suffix) = Substring.splitr (fn c => not (isSeparator c)) (Substring.full s) in if volLen = len then s (* We have a root. *) else if Substring.size suffix = 0 then (* If the last character is a separator just add on the parent arc (..) to refer to the parent directory. I don't know why we can't just remove the last component in this case but the examples don't do that. The only special case is where we have reached the root when we just return the root. *) s ^ parentArc else if Substring.size prefix = 0 then (* No separator at all *) ( if s = parentArc (* .. => ../.. *) then parentArc ^ (separator) ^ parentArc else if s = currentArc then parentArc (* . => .. *) else currentArc (* abc => . *) ) else if Substring.size prefix = volLen (* ??? If the prefix matches the volume then return the whole of prefix including the separator. *) then Substring.string prefix else (* Return the prefix with the separator removed. *) Substring.string(Substring.trimr 1 prefix) end (* Another mess defined in terms of examples for Unix from which one is expected to infer a general rule. It seems to split the string at the last separator and return the two halves without the separator except in the case where the directory is a root directory when a full volume name and separator are given. *) fun splitDirFile s = let (* Split it at the last separator. *) val (prefix, suffix) = Substring.splitr (fn c => not (isSeparator c)) (Substring.full s) val {volLen, vol, ...} = matchVolumePrefix s val dirName = case Substring.size prefix of 0 => "" | 1 => Substring.string prefix (* Special case of Windows \a. *) | _ => Substring.string(Substring.trimr 1 prefix) and fileName = Substring.string suffix in if volLen <> 0 andalso vol = dirName then {dir = vol ^ separator, file = fileName} else {dir = dirName, file = fileName} end fun dir s = #dir(splitDirFile s) and file s = #file(splitDirFile s) (* Question: It seems from the definition of toString that the arcs list can include separators. Is that true here? Assume yes. *) (* If the last character is already a separator we don't add one, e.g. if the directory is "/". *) fun joinDirFile{dir, file} = if not (isValidArc file) then raise InvalidArc else if dir = "" then file (* Return the file name unchanged *) else if isSeparator(String.sub(dir, size dir - 1)) then dir ^ file else dir ^ separator ^ file fun splitBaseExt s = let val slen = String.size s fun getExt n = if n <= 0 then NONE (* If it's at the start ignore it. *) else if isSeparator(String.sub(s, n)) then NONE else if String.sub(s, n) = #"." then (* Found a dot. *) ( if n = slen-1 then NONE (* Dot in last position. *) else if isSeparator(String.sub(s, n-1)) then NONE (* Dot immediately after separator. *) else SOME n ) else getExt (n-1) val extPos = getExt(slen - 1) in case extPos of NONE => {base=s, ext=NONE} | SOME n => {base=String.substring(s, 0, n), ext=SOME(String.substring(s, n+1, slen-n-1))} end fun joinBaseExt {base : string, ext = NONE} = base | joinBaseExt {base : string, ext = SOME ""} = base | joinBaseExt {base : string, ext = SOME ext} = base ^ "." ^ ext fun base s = #base(splitBaseExt s) and ext s = #ext(splitBaseExt s) val emptyArcIsRedundant = true fun mkCanonical s = let val {isAbs, vol, arcs} = fromString s fun collapse [] = [] | collapse (a :: b) = (* Work down the list removing currentArc entries and null entries (if the OS treats them as redundant).. *) if a = currentArc orelse (emptyArcIsRedundant andalso a = "") then collapse b (* Then work back up it removing parentArc entries. *) else case collapse b of [] => [a] | b' as (x :: y) => if x = parentArc andalso not (a = parentArc) then (* Remove "a" and "x". *) y else a :: b' val collapsed = collapse arcs (* If this is the root we can remove leading occurrences of the parent arc since the parent of the root is the root. *) fun removeLeadingParent [] = [] | removeLeadingParent (a::b) = if a = parentArc then removeLeadingParent b else a::b val newArcs = if isAbs then removeLeadingParent collapsed else collapsed val res = toString{isAbs=isAbs, vol=vol, arcs=newArcs} in (* Finally replace the empty string with "." and map to lower case if it's not case sensitive. *) if res = "" then currentArc else toCanonicalCase res end fun isCanonical s = mkCanonical s = s handle Path => false fun isAbsolute s = #isAbs(fromString s) and isRelative s = not(#isAbs(fromString s)) (* Concatenate two paths. The second must be relative and, if it contains a volume name, refer to the same volume as the first. *) fun concat(s, t) = let val {isAbs=absS, vol=volS, arcs=ArcsS} = fromString s val {isAbs=absT, vol=volT, arcs=ArcsT} = fromString t (* Concatenate the two lists of arcs except that a trailing empty arc on the first path is removed (i.e. concat("a/", "b") is the same as concat("a", "b") *) fun concatArcs [] p = p | concatArcs [a] p = if a = "" then p else a :: p | concatArcs (a::b) p = a :: concatArcs b p in if absT then raise Path else if volT <> "" andalso not(equivalent(volS, volT)) then raise Path else if #root(matchVolumePrefix t) (* Special case for Windows. concat("c:\\abc\\def", "\\xyz") is "c:\\xyz". *) then let (* Because this a relative path we have an extra empty arc here. *) val ArcsT' = case ArcsT of "" :: a => a | a => a in toString{isAbs=absS, vol=volS, arcs=ArcsT'} end else toString{isAbs=absS, vol=volS, arcs=concatArcs ArcsS ArcsT} end (* Make an absolute path by treating a relative path as relative to a given path. *) fun mkAbsolute {path, relativeTo} = let val {isAbs=absP, vol=volP, ...} = fromString path val {isAbs=absRT, vol=volRT, ...} = fromString relativeTo in if absP then path else if not absRT then raise Path (* If the path contained a volume it must be the same as the absolute path. *) else if volP <> "" andalso not(equivalent(volP, volRT)) then raise Path else mkCanonical(concat(relativeTo, path)) end (* Make a relative path by treating an absolute path as derived from a given other absolute path. *) fun mkRelative {path, relativeTo} = case fromString path of {isAbs=false, ...} => path (* Already relative *) | {vol=volP, arcs=arcsP, ...} => let val {isAbs=absRT, vol=volRT, arcs=arcsRT} = fromString (mkCanonical relativeTo) (* Add as many parent arcs as there are arcs in the path. *) fun addParents [] p = p | addParents (_::b) p = parentArc :: addParents b p fun matchPaths [] [] = [currentArc] (* Both equal *) | matchPaths p [] = (* Absolute path is finished - return p *) p | matchPaths [] r = (* Relative paths finished - add parent arcs *) addParents r [] | matchPaths (p :: p') (r :: r') = (* Are they the same arc? Note: When arcs are case insensitive I'm doing a case insensitive match here. *) if equivalent(p, r) then matchPaths p' r' else addParents (r :: r') (p :: p') (* We have a special case with the root directory (/ on Unix or c:\\ on Windows). In that case fromString returns a single empty arc and we want to remove it here otherwise we can end up with an empty arc in addParents. *) val arcsP' = case arcsP of [""] => [] | _ => arcsP val arcsRT' = case arcsRT of [""] => [] | _ => arcsRT in if not absRT then raise Path (* If the path contained a volume it must be the same as the absolute path. *) else if volP <> "" andalso not(equivalent(volP, volRT)) then raise Path else toString{isAbs=false, vol="", arcs=matchPaths arcsP' arcsRT'} end (* Another badly defined function. What is a root? Does it have to specify a volume or is \ a root in Windows? Assume that it must be absolute. *) fun isRoot s = let val {volLen, abs, ...} = matchVolumePrefix s in abs andalso volLen = String.size s andalso isCanonical s end (* Question: there's no definition of what these functions mean. The crucial questions are how to deal with volume names and also how to deal with symbols in the paths which may be invalid (e.g. path separators) in one or other system. For instance "a\b" is a valid file name in Unix and 31/3/2000 is valid in MacOS. Are they supposed to represent the original file system in some way? *) fun toUnixPath s = let (* We may have occurrences of "/" in the arcs if that is not a separator on this OS. Replace them by this machine's separator. *) fun mapArc a = if a = currentArc then "." else if a = parentArc then ".." else a fun mapArcs [] = [] | mapArcs [a] = [mapArc a] | mapArcs (a::b) = mapArc a :: "/" :: mapArcs b val {isAbs, vol, arcs} = fromString s val volArc = if vol <> "" then vol :: arcs else arcs val sl = String.concat(mapArcs volArc) in if String.size sl = 0 then "" else if isAbs then if String.sub(sl, 0) <> #"/" then "/" ^ sl else sl else (* not abs *) if String.sub(sl, 0) = #"/" then "." ^ sl else sl end fun fromUnixPath s = let val arcs = String.fields (fn ch => ch = #"/") s (* Turn any occurrences of this OS's separator into / since that can't occur within an arc. *) val convArc = String.translate ( fn ch => if isSeparator ch then "/" else String.str ch) val convArcs = List.map convArc arcs in case convArcs of [] => "" | ("" :: a :: rest) => let (* We had a leading / : is the first arc a volume name? *) val {volLen = n, vol, ...} = matchVolumePrefix a in if n = String.size a then (* We have a volume name. *) toString{isAbs=true, vol=vol, arcs=rest} else toString{isAbs=true, vol="", arcs=convArcs} end | (a :: rest) => let (* May be a relative volume name. *) val {volLen = n, vol, ...} = matchVolumePrefix a in if n = String.size a then toString{isAbs=false, vol=vol, arcs=rest} else toString{isAbs=false, vol="", arcs=convArcs} end end end (* Path *) structure FileSys:> OS_FILE_SYS = struct type dirFd = int (* The directory stream consists of the stream identifier returned by openDir together with the original directory name. We need that for rewind in Windows. *) datatype dirstream = DIR of dirFd * string local val doIo: int*unit*string -> dirFd = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun openDir (s : string): dirstream = DIR(doIo(50, (), s), s) end local val doIo: int*dirFd*unit -> string = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun readDir (DIR(d, _)): string option = let (* This returns the empty string at end-of-stream. *) val s = doIo(51, d, ()) in if s = "" then NONE else SOME s end end local val doIo: int*dirFd*unit -> unit = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun closeDir(DIR(d, _)) = doIo(52, d, ()) end local val doIo: int*dirFd*string -> unit = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* We need to pass in the string because Windows has to reopen the stream. *) fun rewindDir(DIR(d, s)) = doIo(53, d, s) end val chDir: string -> unit = RunCall.rtsCallFull1 "PolyChDir" local val doIo: int*unit*unit -> string = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Return current directory. *) fun getDir() = doIo(54, (), ()) (* Get a temporary file name. *) fun tmpName() = doIo(67, (), ()) end local val doIo: int*unit*string -> unit = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Create and delete directories and remove a file. *) fun mkDir s = doIo(55, (), s) and rmDir s = doIo(56, (), s) and remove s = doIo(64, (), s) end local val doIo: int*unit*string -> bool = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Test for directory and symbolic link. *) fun isDir s = doIo(57, (), s) and isLink s = doIo(58, (), s) end local val doIo: int*unit*string -> string = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Read a symbolic link. *) fun readLink s = doIo(59, (), s) (* Get a full canonical path name. *) and fullPath s = doIo(60, (), s) end local val doIo: int*unit*string -> Time.time = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Get file modification time. *) fun modTime s = doIo(61, (), s) end local val doIo: int*unit*string -> Position.int (* This can be larger than 32-bits. *) = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Get file size. *) fun fileSize s = doIo(62, (), s) end local val doIo: int*string*Time.time -> unit = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Get file size. *) fun setTime(s, NONE) = doIo(63, s, Time.now()) | setTime(s, SOME t) = doIo(63, s, t) end local val doIo: int*string*string -> unit = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Rename a file. *) fun rename {old, new} = doIo(65, old, new) end datatype access_mode = A_READ | A_WRITE | A_EXEC local val doIo: int*string*word -> bool = RunCall.rtsCallFull3 "PolyBasicIOGeneral" fun mapAccess (A_READ, m) = Word.orb(m, 0w1) | mapAccess (A_WRITE, m) = Word.orb(m, 0w2) | mapAccess (A_EXEC, m) = Word.orb(m, 0w4) in (* Get access rights. *) fun access (s, m) = doIo(66, s, List.foldl mapAccess 0w0 m) end (* file_id seems to be intended to reflect the semantics of a Unix inode. That concept doesn't exist in Windows so we use a canonical file name instead. *) datatype file_id = INODE of LargeInt.int | FILENAME of string fun compare(INODE i, INODE j) = LargeInt.compare(i, j) | compare(FILENAME s, FILENAME t) = String.compare(s, t) | (* These cases shouldn't happen but we'll define them anyway. *) compare(INODE _, FILENAME _) = General.GREATER | compare(FILENAME _, INODE _) = General.LESS (* TODO: The hash function is supposed to well distribute the the values when taken modulo 2^n for any n. I'm sure we can come up with something better than this. *) fun hash(INODE i) = let open Word infix xorb << val w = Word.fromLargeInt i in w xorb (w << 0w8) xorb (w << 0w16) xorb (w << 0w24) end | hash(FILENAME s) = (* Simple hash function which multiplies the accumulator by 7 and adds in the next character. *) CharVector.foldl (fn(c, a) => a * 0w7 + Word.fromInt(Char.ord c)) 0w0 s local val doIo: int*unit*string -> LargeInt.int = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Get file id (inode). Returns negative value if inodes aren't supported. *) fun fileId s = let val i = doIo(68, (), s) in if i < 0 then FILENAME(fullPath s) else INODE i end end fun realPath p = if Path.isAbsolute p then fullPath p else Path.mkRelative{path=fullPath p, relativeTo=fullPath(getDir())} end (* FileSys *) structure IO :> OS_IO = struct datatype iodesc = IODESC of int (* Actually abstract. This isn't the file descriptor itself, rather a pointer into the io table. *) local val doIo: int*iodesc*unit -> int = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in (* Get underlying index. *) fun getIndex f = doIo(69, f, ()) end (* TODO: The hash function is supposed to well distribute the the values when taken modulo 2^n for any n. I'm sure we can come up with something better than this. *) fun hash (i: iodesc) : word = let open Word infix xorb << val w = Word.fromInt(getIndex i) in w xorb (w << 0w8) xorb (w << 0w16) xorb (w << 0w24) end fun compare(i, j) = Int.compare(getIndex i, getIndex j) (* eq *)type iodesc_kind = int structure Kind = struct val file : iodesc_kind = 0 val dir : iodesc_kind = 1 val symlink : iodesc_kind = 2 val tty : iodesc_kind = 3 val pipe : iodesc_kind = 4 val socket : iodesc_kind = 5 val device : iodesc_kind = 6 end local val doIo: int*iodesc*int -> int = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun kind (i: iodesc): iodesc_kind = let val k = doIo(21, i, 0) in (* Returns a negative number if the call fails, otherwise one of the above numbers. Returns 7 on unknown or something else. *) if k < 0 orelse k > 6 then raise SysErr("Invalid result", NONE) else k end end (* The poll descriptor and the result of polling is a bit map together with the io descriptor. *) val inBit = 0w1 and outBit = 0w2 and priBit = 0w4 (* N.B. The implementation of poll_desc is hard-wired into Socket.pollDesc. *) type poll_desc = word*iodesc datatype poll_info = PI of word*poll_desc local val doIo: int*iodesc*int -> word = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun sys_poll_test(i: iodesc) = doIo(22, i, 0) end local val doIo: int*int* (iodesc Vector.vector * word Vector.vector * Time.time) -> word Vector.vector = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun sys_poll_block(iov, wv) = doIo(23, 0, (iov, wv, Time.zeroTime)) fun sys_poll_poll(iov, wv) = doIo(25, 0, (iov, wv, Time.zeroTime)) and sys_poll_wait (iov, wv, t) = doIo(24, 0, (iov, wv, t)) end fun pollDesc (i: iodesc): poll_desc option = (* If the poll test returns zero then polling is not allowed for any mode. *) if sys_poll_test i = 0w0 then NONE else SOME(0w0, i) fun pollToIODesc(_, i): iodesc = i exception Poll (* Add the appropriate bit to the set if it is allowed. *) local fun addBit b ((bm, i)) = if Word.andb(sys_poll_test i, b) = 0w0 then raise Poll else (Word.orb(bm, b), i) in val pollIn = addBit inBit and pollOut = addBit outBit and pollPri = addBit priBit end fun poll (l : poll_desc list, t: Time.time Option.option) : poll_info list = let (* The original poll descriptor list may contain multiple occurrences of the same IO descriptor with the same or different flags. On Cygwin, at least, passing this directly produces funny results so we transform the request so that we make at most one request for each descriptor. *) local fun quickSort _ ([]:'a list) = [] | quickSort _ ([h]:'a list) = [h] | quickSort (leq:'a -> 'a -> bool) ((h::t) :'a list) = let val (after, befor) = List.partition (leq h) t in quickSort leq befor @ (h :: quickSort leq after) end; fun leqPoll((p1, f1): poll_desc) ((p2, f2): poll_desc) = case compare(f1, f2) of EQUAL => p1 <= p2 | LESS => true | GREATER => false fun merge ((p1, f1) :: (p2, f2) :: rest) = if compare(f1, f2) = EQUAL then merge((Word.orb(p1, p2), f1) :: rest) else (p1, f1) :: merge((p2, f2) :: rest) | merge c = c val sortedDescs = quickSort leqPoll l in val mergedDescs = merge sortedDescs end (* Turn the list into vectors of io descriptors and request bits - easier for the RTS to manage. N.B. This assumes that Vector.vector creates a simple memory vector and does not wrap it in any way. *) local val (bits, ioDescs) = ListPair.unzip mergedDescs in val bitVector: word Vector.vector = Vector.fromList bits and ioVector: iodesc Vector.vector = Vector.fromList ioDescs end (* Do the actual polling. Returns a vector with bits set for the results. *) val resV: word Vector.vector = case t of NONE => sys_poll_block(ioVector, bitVector) | SOME tt => let open Time in if tt = Time.zeroTime then sys_poll_poll(ioVector, bitVector) else if tt < Time.zeroTime (* Must check for negative times since these can be interpreted as infinity. *) then raise SysErr("Invalid time", NONE) (* For non-zero times we convert this to a number of milliseconds since the current time. We have to pass in an absolute time rather than a relative time because the RTS may retry this call if the polled events haven't happened. *) else sys_poll_wait(ioVector, bitVector, tt + Time.now()) end (* Process the original list to see which items are present, retaining the original order. *) fun testResults(request as (bits, iod), tl) = let val (index, _) = (* Find the IO descriptor. It must be there somewhere. *) valOf(Vector.findi (fn (_, iod1) => compare(iod, iod1) = EQUAL) ioVector) (* The result is in the corresponding index position. We need to AND this with the request because we could have separate requests asking for different bits for the same file descriptor. *) val result = Word.andb(bits, Vector.sub(resV, index)) in if result = 0w0 then tl else PI(result, request) :: tl end in List.foldl testResults [] l end fun isIn(PI(b, _)) = Word.andb(b, inBit) <> 0w0 and isOut(PI(b, _)) = Word.andb(b, outBit) <> 0w0 and isPri(PI(b, _)) = Word.andb(b, priBit) <> 0w0 fun infoToPollDesc (PI(_, pd)) = pd end (* IO *) structure Process:> OS_PROCESS = struct type status = int local val doCall: int*unit -> int = RunCall.rtsCallFull2 "PolyProcessEnvGeneral" in val success = doCall(15, ()) and failure = doCall(16, ()) end fun isSuccess i = i = success local val doCall: int*string -> status = RunCall.rtsCallFull2 "PolyProcessEnvGeneral" in (* Run a process and wait for the result. *) fun system s = doCall(17, s) end local - val doCall: int*(unit->unit) -> unit - = RunCall.rtsCallFull2 "PolyProcessEnvGeneral" - in - (* Register a function to be run at exit. *) - fun atExit f = doCall(18, f) - end - - local - (* exit - supply result code and close down all threads. *) - val doExit: int -> unit = RunCall.rtsCallFull1 "PolyFinish" - val doCall: int*unit -> (unit->unit) = - RunCall.rtsCallFull2 "PolyProcessEnvGeneral" + val atExitList = LibrarySupport.volatileListRef() + val atExitMutex = Thread.Mutex.mutex() + val exitResult = ref NONE (* Set to the exit result. *) + + val reallyExit: int -> unit = RunCall.rtsCallFull1 "PolyFinish" in + (* Register a function to be run at exit. If we are already exiting + this has no effect. *) + val atExit = ThreadLib.protect atExitMutex + (fn f => case exitResult of ref(SOME _) => atExitList := f :: !atExitList | ref NONE => ()) + + (* Exit. Run the atExit functions and then exit with the result code. + There are a few complications. If a second thread calls exit after + the first one it mustn't start the exit process again. If one of the + atExit functions calls exit recursively it is defined to never return. + We just need to pick up the next atExit function and carry on. *) fun exit (n: int) = let - (* Get a function from the atExit list. If that list - is empty it will raise an exception and we've finished. *) - val exitFun = - doCall(19, ()) handle _ => (doExit n; fn () => ()) + open Thread + open Mutex Thread + (* Turn off further interrupts *) + val () = setAttributes[InterruptState InterruptDefer] + val () = lock atExitMutex + val () = + case !exitResult of + SOME threadId => + if threadId = self() + then () + else (unlock atExitMutex; Thread.exit()) + | NONE => exitResult := SOME(self()) + val () = unlock atExitMutex + (* This is now the only thread here. + Take an item off the list and update the list with the + tail in case we recursively call "exit". *) + fun runExit () = + case !atExitList of + [] => reallyExit n + | (hd::tl) => (atExitList := tl; hd() handle _ => (); runExit()) in - (* Run the function and then repeat. *) - exitFun() handle _ => (); (* Ignore exceptions in the function. *) - exit(n) + runExit(); + raise Match (* Never reached but gives the 'a result. *) end end (* Terminate without running the atExit list or flushing the buffers. We raise an exception to get the type right. *) local val doCall: int -> unit = RunCall.rtsCallFull1 "PolyTerminate" in fun terminate n = (doCall n; raise Fail "never") end local val doCall: int*string -> string = RunCall.rtsCallFull2 "PolyProcessEnvGeneral" in (* Get an environment string. The underlying call raises an exception if the string isn't there. *) fun getEnv s = SOME(doCall(14, s)) handle RunCall.SysErr _ => NONE end (* poll is implemented so that an empty list simply waits for the time. *) fun sleep t = (IO.poll([], SOME t); ()) end (* Process. *) end; local (* Install the pretty printer for OS.IO.Kind and OS.syserror. *) fun kind_string k = if k = OS.IO.Kind.file then "file" else if k = OS.IO.Kind.dir then "dir" else if k = OS.IO.Kind.symlink then "symlink" else if k = OS.IO.Kind.tty then "tty" else if k = OS.IO.Kind.pipe then "pipe" else if k = OS.IO.Kind.socket then "socket" else if k = OS.IO.Kind.device then "device" else "unknown" fun printKind _ _ x = PolyML.PrettyString(kind_string x) fun printSysError _ _ x = PolyML.PrettyString(OS.errorName x) (* For the moment just make these opaque. *) fun printPollDesc _ _ (_: OS.IO.poll_desc) = PolyML.PrettyString "?" and printPollInfo _ _ (_: OS.IO.poll_info) = PolyML.PrettyString "?" in val () = PolyML.addPrettyPrinter printKind val () = PolyML.addPrettyPrinter printSysError val () = PolyML.addPrettyPrinter printPollDesc val () = PolyML.addPrettyPrinter printPollInfo end diff --git a/basis/TextIO.sml b/basis/TextIO.sml index 7dad6661..29640d52 100644 --- a/basis/TextIO.sml +++ b/basis/TextIO.sml @@ -1,420 +1,419 @@ (* Title: Standard Basis Library: Text IO - Copyright David C.J. Matthews 2000, 2005, 2016, 2018 + Copyright David C.J. Matthews 2000, 2005, 2016, 2018, 2020 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) signature TEXT_STREAM_IO = sig include STREAM_IO where type vector = CharVector.vector where type elem = Char.char val inputLine : instream -> (string * instream) option val outputSubstr : outstream * Substring.substring -> unit end; signature TEXT_IO = sig (* include IMPERATIVE_IO *) structure StreamIO : TEXT_STREAM_IO where type reader = TextPrimIO.reader where type writer = TextPrimIO.writer where type pos = TextPrimIO.pos type vector = StreamIO.vector type elem = StreamIO.elem type instream type outstream val input : instream -> vector val input1 : instream -> elem option val inputN : instream * int -> vector val inputAll : instream -> vector val canInput : instream * int -> int option val lookahead : instream -> elem option val closeIn : instream -> unit val endOfStream : instream -> bool val output : outstream * vector -> unit val output1 : outstream * elem -> unit val flushOut : outstream -> unit val closeOut : outstream -> unit val mkInstream : StreamIO.instream -> instream val getInstream : instream -> StreamIO.instream val setInstream : instream * StreamIO.instream -> unit val mkOutstream : StreamIO.outstream -> outstream val getOutstream : outstream -> StreamIO.outstream val setOutstream : outstream * StreamIO.outstream -> unit val getPosOut : outstream -> StreamIO.out_pos val setPosOut : outstream * StreamIO.out_pos -> unit (* End of include IMPERATIVE_IO *) val inputLine : instream -> string option val outputSubstr : outstream * Substring.substring -> unit val openIn : string -> instream val openOut : string -> outstream val openAppend : string -> outstream val openString : string -> instream val stdIn : instream val stdOut : outstream val stdErr : outstream val print : string -> unit val scanStream : ((Char.char, StreamIO.instream) StringCvt.reader -> ('a, StreamIO.instream) StringCvt.reader) -> instream -> 'a option end; structure TextIO :> TEXT_IO = struct open IO type vector = String.string and elem = Char.char exception Interrupt = RunCall.Interrupt (* Called after any exception in the lower level reader or writer to map any exception other than Io into Io. *) fun mapToIo (io as Io _, _, _) = io | mapToIo (Interrupt, _, _) = Interrupt | mapToIo (nonIo, name, caller) = Io { name = name, function = caller, cause = nonIo } (* Functional IO Layer. *) structure TextStreamIO = struct structure BasicTextStreamIO = BasicStreamIO( structure PrimIO = TextPrimIO structure Vector = CharVector structure Array = CharArray structure VectorSlice = CharVectorSlice structure ArraySlice = CharArraySlice val someElem : PrimIO.elem = #" " ); open BasicTextStreamIO (* Input a line. Adds a newline if the file ends without one. *) fun inputLine f = let (* Read a sequence of blocks until we get a newline or EOF. *) fun inputBlocks read f = let (* Read the next block and see how big it is. *) val (blk, f') = input f val length = String.size blk (* See if it contains a newline and if so where. *) fun newlinePos i = if i = length then length+1 else if String.sub(blk, i) = #"\n" then i+1 (* Return characters including newline. *) else newlinePos (i+1) val nlPos = newlinePos 0 in if length = 0 (* EOF *) then ( (* If we have not read anything at all we return NONE otherwise return what we had with a newline added. *) case read of [] => NONE | _ => SOME(String.concat(List.rev("\n"::read)), f) ) else if nlPos > length then inputBlocks (blk::read) f' (* No newline - get another block.. *) else (* The string we read included a newline. *) let (* Reread all up to and including the newline and return the stream which gives us the rest. *) val (b, f') = inputN(f, nlPos) in SOME(String.concat(List.rev(b::read)), f') end end in (* If we are at end-of-stream we return NONE. Since this is a functional stream that means we will always return NONE for a given f (i.e. there's no temporary end-of-stream to be cleared). *) inputBlocks [] f end (* StreamIO treats line buffering on output as block buffering since it has no concept of a line separator. *) fun output(f, v) = case getBufferMode f of LINE_BUF => let val vecLen = CharVector.length v (* Find the last newline character in the string. *) fun lastNewline 0 = 0 | lastNewline i = if CharVector.sub(v, i-1) = #"\n" then i else lastNewline(i-1) val newLinePos = lastNewline vecLen in if newLinePos = 0 then (* No newlines in it. *) BasicTextStreamIO.output(f, v) else (* There's at least one newline. *) ( outputVec(f, CharVectorSlice.slice(v, 0, SOME newLinePos)); flushOut f; outputVec(f, CharVectorSlice.slice(v, newLinePos, NONE)) ) end | _ => BasicTextStreamIO.output(f, v) (* Not line buffering. *) (* This could be defined in terms of output but the underlying output1 function is likely to be more efficient. *) fun output1(f, c) = ( BasicTextStreamIO.output1(f, c); if c = #"\n" andalso getBufferMode f = LINE_BUF then flushOut f else () ) end (* StreamIO. *) (* The imperative IO streams *) structure ImpIO = BasicImperativeIO( structure StreamIO = TextStreamIO structure Vector = CharVector structure Array = CharArray) open ImpIO (* Now define StreamIO as our extended StreamIO *) (* Replace the StreamIO from ImpIO by our version. *) structure StreamIO = struct open TextStreamIO val outputSubstr = outputVec end open Thread.Thread open Thread.Mutex open LibrarySupport.CharArray type fileDescr = OS.IO.iodesc; type address = LibrarySupport.address (* We have to declare doIo separately depending on the types of the arguments. It's possible to get round this but that would result in an extra call to run_call3 for each io call. *) local val doIo: int*int*string -> fileDescr = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun sys_open_in_text name = doIo(3, 0, name) and sys_open_out_text name = doIo(5, 0, name) and sys_open_append_text name = doIo(13, 0, name) end local val doIo = RunCall.rtsCallFull3 "PolyBasicIOGeneral" in fun sys_get_buffsize (strm: fileDescr): int = doIo(15, strm, 0) end (* Create the primitive IO functions and add the higher layers. *) fun wrapInFileDescr(n, name) = let val textPrimRd = LibraryIOSupport.wrapInFileDescr{fd=n, name=name, initBlkMode=true} in StreamIO.mkInstream(textPrimRd, "") end fun wrapOutFileDescr(n, name, buffering, isAppend) = let val buffSize = sys_get_buffsize n val textPrimWr = LibraryIOSupport.wrapOutFileDescr{fd=n, name=name, appendMode=isAppend, initBlkMode=true, chunkSize=buffSize} in StreamIO.mkOutstream(textPrimWr, buffering) end (* Open a file for output. *) fun openOut s = let val f = sys_open_out_text s handle exn => raise mapToIo(exn, s, "TextIO.openOut") (* Look at the stream to see what kind of buffering to use. *) val k = OS.IO.kind f in mkOutstream(wrapOutFileDescr (f, s, if k = OS.IO.Kind.tty then IO.LINE_BUF else IO.BLOCK_BUF, false (* Not append *))) end fun openAppend s = let val f = sys_open_append_text s handle exn => raise mapToIo(exn, s, "TextIO.openAppend") val k = OS.IO.kind f in mkOutstream(wrapOutFileDescr (f, s, if k = OS.IO.Kind.tty then IO.LINE_BUF else IO.BLOCK_BUF, true (* setPos will not work. *))) end (* Open a file for input. *) fun openIn s = let val f = sys_open_in_text s handle exn => raise mapToIo(exn, s, "TextIO.openIn") in ImpIO.mkInstream(wrapInFileDescr(f, s)) end local val doIo: int*int*int -> fileDescr = RunCall.rtsCallFull3 "PolyBasicIOGeneral" fun getStdDescriptors() = {stdInDesc=doIo(0, 0, 0), stdOutDesc=doIo(1, 0, 0), stdErrDesc=doIo(2, 0, 0) } (* Get the current descriptors for the rest of the bootstrap and use them to initialise stdIn, stdOut and stdErr. *) val {stdInDesc, stdOutDesc, stdErrDesc} = getStdDescriptors() in (* Get the entries for standard input, standard output and standard error. *) val stdIn = ImpIO.mkInstream(wrapInFileDescr(stdInDesc, "stdIn")) (* Set buffering on standard output to block buffering during bootstrap. *) val stdOut = mkOutstream(wrapOutFileDescr(stdOutDesc, "stdOut", IO.BLOCK_BUF, false)) and stdErr = mkOutstream(wrapOutFileDescr(stdErrDesc, "stdErr", IO.NO_BUF (* Defined to be unbuffered. *), false)) local (* On startup set the streams. *) fun onStartUp () = let val {stdInDesc, stdOutDesc, stdErrDesc} = getStdDescriptors() (* If we're READING from a tty set the OUTPUT stream to line buffering. This ensures that prompts are written out as soon as they're needed. *) val stdOutBuff = if OS.IO.kind stdInDesc = OS.IO.Kind.tty then IO.LINE_BUF else IO.BLOCK_BUF val stdInStream = wrapInFileDescr(stdInDesc, "stdIn") and stdOutStream = wrapOutFileDescr(stdOutDesc, "stdOut", stdOutBuff, false) and stdErrStream = wrapOutFileDescr(stdErrDesc, "stdErr", IO.NO_BUF (* Defined to be unbuffered. *), false) in ImpIO.setInstream(stdIn, stdInStream); ImpIO.setOutstream(stdOut, stdOutStream); ImpIO.setOutstream(stdErr, stdErrStream) end in (* Set up an onEntry handler so that this is always installed. *) val () = PolyML.onEntry onStartUp end end local (* This requires access to the underlying representation in order to be able to lock the stream while reading the line. This ensures that if multiple threads are reading lines from a stream each thread will get a complete line. *) - fun inputLine' fStream = - let - val f = ! fStream - in + fun inputLine' (fStream as ref(SOME f)) = + ( case StreamIO.inputLine f of NONE => let (* It's not clear what should happen here. Assume that this clears any temporary EOF. *) val (_, f') = StreamIO.input f in - fStream := f'; + fStream := SOME f'; NONE end - | SOME (s, f') => ( fStream := f'; SOME s ) - end + | SOME (s, f') => ( fStream := SOME f'; SOME s ) + ) + | inputLine' _ = NONE in fun inputLine s = ImpIO.protect s inputLine' end fun outputSubstr(f, s) = StreamIO.outputSubstr(getOutstream f, s) fun print s = (output(stdOut, s); flushOut stdOut) (* Open a string as an input stream. It would be possible to define this using the string as the argument to mkInstream and a null reader. This way gives more flexibility since it allows for random access to the string. *) fun openString (s: string) : instream = let val stringLength = String.size s val posN: int ref = ref 0 (* We can read from the string until it is exhausted. *) fun readVec (len: int): vector = let val l = Int.min(len, stringLength - !posN) val v = String.substring(s, !posN, l) in posN := !posN + l; v end (* Closing it simply exhausts the input. *) fun close () : unit = (posN := stringLength) and avail () : int option = SOME(stringLength - ! posN) and readVecNB l = SOME(readVec l) and block () = () and canInput () = true val textPrimRd = TextPrimIO.RD { name = "StringPrimIO", chunkSize = stringLength, (* Most efficient to read the whole string. *) readVec = SOME readVec, readArr = NONE, (* Can be synthesised. *) readVecNB = SOME readVecNB, readArrNB = NONE, (* Can be synthesised. *) block = SOME block, canInput = SOME canInput, avail = avail, getPos = NONE, (* Difficult because the position is abstract. *) setPos = NONE, endPos = NONE, verifyPos = NONE, close = close, ioDesc = NONE } val streamIo = StreamIO.mkInstream(textPrimRd, "") in ImpIO.mkInstream streamIo end fun scanStream scanFn strm = let val f = getInstream strm in case (scanFn StreamIO.input1 f) of NONE => NONE | SOME(v, f') => ( setInstream(strm, f'); SOME v ) end end; (* Available unqualified at top-level. *) val print = TextIO.print; diff --git a/basis/Thread.sml b/basis/Thread.sml index a0e9578d..68ad3586 100644 --- a/basis/Thread.sml +++ b/basis/Thread.sml @@ -1,771 +1,766 @@ (* Title: Thread package for ML. Author: David C. J. Matthews - Copyright (c) 2007-2014, 2018 + Copyright (c) 2007-2014, 2018, 2020 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License version 2.1 as published by the Free Software Foundation. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *) (* This signature and structure are not part of the standard basis library but are included here because they depend on the Time structure and are in turn dependencies of the BasicIO structure. *) (*!Earlier versions of Poly/ML have provided a form of concurrent execution through the Process structure. Version 5.1 introduces new thread primitives in the Thread structure. This structure is modelled on the Posix thread (pthread) package but simplified and modified for ML. The aim is to provide an efficient implementation of parallelism particularly to enable ML programs to make use of multi-core processors while minimising the changes needed to existing code. The Process structure will continue to be available as a library written on top of these primitives but new programs should use the Thread structure directly. The thread package differs from pthreads in a number of ways. There is no join function to wait for the completion of a thread. This can be written using mutexes and condition variables. Cancellation and signal handling are combined into the interrupt functions. (The Poly/ML Signal structure handles signals for all the threads together). The effect of explicit cancellation is achieved using the interrupt function. This causes an interrupt to be generated in a specific thread. Alternatively an interrupt can be broadcast to all threads. This is most likely to be used interactively to kill threads that appear to have gone out of control. The normal top-level handler for a console interrupt will generate this. Threads can choose how or whether they respond to these interrupts. A thread that is doing processor-intensive work probably needs to be able to be interrupted asynchronously whereas if it is communicating with other threads the presence of asynchronous interrupts makes correct programming difficult. *) signature THREAD = sig (*!The Thread exception can be raised by various of the functions in the structure if they detect an error.*) exception Thread of string (* Raised if an operation fails. *) structure Thread: sig (*!The type of a thread identifier.*) eqtype thread (* Thread attributes - This may be extended. *) (*!The type of a thread attribute. Thread attributes are properties of the thread that are set initially when the thread is created but can subsequently be modified by the thread itself. The thread attribute type may be extended in the future to include things like scheduling priority. The current thread attributes control the way interrupt exceptions are delivered to the thread. `EnableBroadcastInterrupt` controls whether the thread will receive an interrupt sent using `broadcastInterrupt` or as a result of pressing the console interrupt key. If this is false the thread will not receive them. The default for a new thread if this is not specified is false. `InterruptState` controls when and whether interrupts are delivered to the thread. This includes broadcast interrupts and also interrupts directed at a specific thread with the interrupt call. `InterruptDefer` means the thread will not receive any interrupts. However, if the thread has previously been interrupted the interrupt may be delivered when the thread calls setAttributes to change its interrupt state. `InterruptSynch` means interrupts are delivered synchronously. An interrupt will be delayed until an interruption point. An interruption point is one of: `testInterrupt`, `ConditionVar.wait`, `ConditionVar.waitUntil` and various library calls that may block, such as IO calls, pause etc. N.B. `Mutex.lock` is not an interruption point even though it can result in a thread blocking for an indefinite period. `InterruptAsynch` means interrupts are delivered asynchronously i.e. at a suitable point soon after they are triggered. `InterruptAsynchOnce` means that only a single interrupt is delivered asynchronously after which the interrupt state is changed to `InterruptSynch`. It allows a thread to tidy up and if necessary indicate that it has been interrupted without the risk of a second asynchronous interrupt occurring in the handler for the first interrupt. If this attribute is not specified when a thread is created the default is `InterruptSynch`. `MaximumMLStack` was added in version 5.5.3. It controls the maximum size the ML stack may grow to. It is an option type where NONE allows the stack to grow to the limit of the available memory whereas SOME n limits the stack to n words. This is approximate since there is some rounding involved. When the limit is reached the thread is sent an Interrupt exception.*) datatype threadAttribute = (* Does this thread accept a broadcast interrupt? The default is not to accept broadcast interrupts. *) EnableBroadcastInterrupt of bool (* How to handle interrupts. The default is to handle interrupts synchronously. *) | InterruptState of interruptState (* Maximum size of the ML stack in words. NONE means unlimited *) | MaximumMLStack of int option and interruptState = InterruptDefer (* Defer any interrupts. *) | InterruptSynch (* Interrupts are delivered synchronously. An interrupt will be delayed until an interruption point. An interruption point is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil and various library calls that may block, such as IO calls, pause etc. N.B. Mutex.lock is not an interruption point even though it can result in a thread blocking for an indefinite period. *) | InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable point soon after they are triggered. *) | InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt is delivered asynchronously after which the interrupt state is changed to InterruptSynch. It allows a thread to tidy up and if necessary indicate that it has been interrupted without the risk of a second asynchronous interrupt occurring in the handler for the first interrupt. *) (*!Fork a thread. Starts a new thread running the function argument. The attribute list gives initial values for thread attributes which can be modified by the thread itself. Any unspecified attributes take default values. The thread is terminated when the thread function returns, if it raises an uncaught exception or if it calls `exit`;*) val fork: (unit->unit) * threadAttribute list -> thread (*!Terminate this thread. *) val exit: unit -> unit (*!Test if a thread is still running or has terminated. This function should be used with care. The thread may be on the point of terminating and still appear to be active.*) val isActive: thread -> bool (*!Test whether thread ids are the same. This is provided for backwards compatibility since `thread` is an eqtype. *) val equal: thread * thread -> bool (*!Return the thread identifier for the current thread. *) val self: unit -> thread exception Interrupt (* = SML90.Interrupt *) (*!Send an Interrupt exception to a specific thread. When and indeed whether the exception is actually delivered will depend on the interrupt state of the target thread. Raises Thread if the thread is no longer running, so an exception handler should be used unless the thread is known to be blocked. *) val interrupt: thread -> unit (*!Send an interrupt exception to every thread which is set to accept it. *) val broadcastInterrupt: unit -> unit (*!If this thread is handling interrupts synchronously, test to see if it has been interrupted. If so it raises the `Interrupt` exception. *) val testInterrupt: unit -> unit (*!Terminate a thread. This should be used as a last resort. Normally a thread should be allowed to clean up and terminate by using the interrupt call. Raises Thread if the thread is no longer running, so an exception handler should be used unless the thread is known to be blocked. *) val kill: thread -> unit (*!Get and set thread-local store for the calling thread. The store is a tagged associative memory which is initially empty for a new thread. A thread can call setLocal to add or replace items in its store and call getLocal to return values if they exist. The Universal structure contains functions to make new tags as well as injection, projection and test functions. *) val getLocal: 'a Universal.tag -> 'a option and setLocal: 'a Universal.tag * 'a -> unit (*!Change the specified attribute(s) for the calling thread. Unspecified attributes remain unchanged. *) val setAttributes: threadAttribute list -> unit (*!Get the values of attributes. *) val getAttributes: unit -> threadAttribute list (*!Return the number of processors that will be used to run threads and the number of physical processors if that is available. *) val numProcessors: unit -> int and numPhysicalProcessors: unit -> int option end structure Mutex: sig (*!A mutex provides simple mutual exclusion. A thread can lock a mutex and until it unlocks it no other thread will be able to lock it. Locking and unlocking are intended to be fast in the situation when there is no other process attempting to lock the mutex. These functions may not work correctly if an asynchronous interrupt is delivered during the calls. A thread should use synchronous interrupt when using these calls. *) type mutex (*!Make a new mutex *) val mutex: unit -> mutex (*!Lock a mutex. If the mutex is currently locked the thread is blocked until it is unlocked. If a thread tries to lock a mutex that it has previously locked the thread will deadlock. N.B. `thread` is not an interruption point (a point where synchronous interrupts are delivered) even though a thread can be blocked indefinitely. *) val lock: mutex -> unit (*!Unlock a mutex and allow any waiting threads to run. The behaviour if the mutex was not previously locked by the calling thread is undefined. *) val unlock: mutex -> unit (*!Attempt to lock the mutex. Returns true if the mutex was not previously locked and has now been locked by the calling thread. Returns false if the mutex was previously locked, including by the calling thread. *) val trylock: mutex -> bool end structure ConditionVar: sig (*!Condition variables are used to provide communication between threads. A condition variable is used in conjunction with a mutex and usually a reference to establish and test changes in state. The normal use is for one thread to lock a mutex, test the reference and then wait on the condition variable, releasing the lock on the mutex while it does so. Another thread may then lock the mutex, update the reference, unlock the mutex, and signal the condition variable. This wakes up the first thread and reacquires the lock allowing the thread to test the updated reference with the lock held. More complex communication mechanisms, such as blocking channels, can be written in terms of condition variables. *) type conditionVar (*!Make a new condition variable. *) val conditionVar: unit -> conditionVar (*!Release the mutex and block until the condition variable is signalled. When wait returns the mutex will have been re-acquired. If the thread is handling interrupts synchronously this function can be interrupted using the `Thread.interrupt` function or, if the thread is set to accept broadcast interrupts, `Thread.broadcastInterrupt`. The thread will re-acquire the mutex before the exception is delivered. An exception will only be delivered in this case if the interrupt is sent before the condition variable is signalled. If the interrupt is sent after the condition variable is signalled the function will return normally even if it has not yet re-acquired the mutex. The interrupt state will be delivered on the next call to "wait", `Thread.testInterrupt` or other blocking call. A thread should never call this function if it may receive an asynchronous interrupt. It should always set its interrupt state to either `InterruptSynch` or `InterruptDefer` beforehand. An asynchronous interrupt may leave the condition variable and the mutex in an indeterminate state and could lead to deadlock. A condition variable should only be associated with one mutex at a time. All the threads waiting on a condition variable should pass the same mutex as argument.*) val wait: conditionVar * Mutex.mutex -> unit (*!As wait except that it blocks until either the condition variable is signalled or the time (absolute) is reached. Either way the mutex is reacquired so there may be a further delay if it is held by another thread. *) val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool (*!Wake up one thread if any are waiting on the condition variable. If there are several threads waiting for the condition variable one will be selected to run and will run as soon as it has re-acquired the lock.*) val signal: conditionVar -> unit (*!Wake up all threads waiting on the condition variable. *) val broadcast: conditionVar -> unit end end; structure Thread :> THREAD = struct exception Thread = RunCall.Thread - - (* Create non-overwritable mutables for mutexes and condition variables. - A non-overwritable mutable in the executable or a saved state is not - overwritten when a saved state further down the hierarchy is loaded. *) - val nvref = LibrarySupport.noOverwriteRef - + structure Thread = struct open Thread (* Created in INITIALISE with thread type and self function. *) (* Equality is pointer equality. *) val equal : thread*thread->bool = op = datatype threadAttribute = EnableBroadcastInterrupt of bool | InterruptState of interruptState | MaximumMLStack of int option and interruptState = InterruptDefer | InterruptSynch | InterruptAsynch | InterruptAsynchOnce (* Convert attributes to bits and a mask. *) fun attrsToWord (at: threadAttribute list): Word.word * Word.word = let (* Check that a particular attribute appears only once. As well as accumulating the actual bits in the result we also accumulate the mask of bits. If any of these reappear we raise an exception. *) fun checkRepeat(r, acc, set, mask) = if Word.andb(set, mask) <> 0w0 then raise Thread "The same attribute appears more than once in the list" else convert(r, acc, Word.orb(set, mask)) and convert([], acc, set) = (acc, set) | convert(EnableBroadcastInterrupt true :: r, acc, set) = checkRepeat(r, Word.orb(acc, 0w1), set, 0w1) | convert(EnableBroadcastInterrupt false :: r, acc, set) = checkRepeat(r, acc (* No bit *), set, 0w1) | convert(InterruptState s :: r, acc, set) = checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6) | convert(MaximumMLStack _ :: r, acc, set) = convert(r, acc, set) in convert(at, 0w0, 0w0) end and setIstateBits InterruptDefer = 0w0 | setIstateBits InterruptSynch = 0w2 | setIstateBits InterruptAsynch = 0w4 | setIstateBits InterruptAsynchOnce = 0w6 fun getIstateBits(w: Word.word): interruptState = let val ibits = Word.andb(w, 0w6) in if ibits = 0w0 then InterruptDefer else if ibits = 0w2 then InterruptSynch else if ibits = 0w4 then InterruptAsynch else InterruptAsynchOnce end fun wordToAttrs w = let (* Enable broadcast - true if bottom bit is set. *) val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1) in [bcast, InterruptState(getIstateBits w)] end exception Interrupt = RunCall.Interrupt (* The thread id is opaque outside this structure but is actually a six word mutable object. Word 0: Index into thread table (used inside the RTS only) Word 1: Flags: initialised by the RTS and set by this code Word 2: Thread local store: read and set by this code. Word 3: IntRequest: Set by the RTS if there is an interrupt pending Word 4: Maximum ML stack size. Unlimited is stored here as zero *) val threadIdFlags = 0w1 and threadIdThreadLocal = 0w2 and threadIdIntRequest = 0w3 and threadIdStackSize = 0w4 fun getLocal (t: 'a Universal.tag) : 'a option = let val root: Universal.universal ref list = RunCall.loadWord(self(), threadIdThreadLocal) fun doFind [] = NONE | doFind ((ref v)::r) = if Universal.tagIs t v then SOME(Universal.tagProject t v) else doFind r in doFind root end fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit = let (* See if we already have this in the list. *) val root: Universal.universal ref list = RunCall.loadWord(self(), threadIdThreadLocal) fun doFind [] = (* Not in the list - Add it. *) RunCall.storeWord (self(), threadIdThreadLocal, ref (Universal.tagInject t newVal) :: root) | doFind (v::r) = if Universal.tagIs t (!v) (* If it's in the list update it. *) then v := Universal.tagInject t newVal else doFind r in doFind root end local val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt" in fun testInterrupt() = (* If there is a pending request the word in the thread object will be non-zero. *) if RunCall.loadWord(self(), threadIdIntRequest) <> 0 then threadTestInterrupt() else () end local fun getAttrWord (me: thread) : Word.word = RunCall.loadWord(me, threadIdFlags) fun getStackSizeAsInt (me: thread) : int = RunCall.loadWord(me, threadIdStackSize) and getStackSize me : int option = case getStackSizeAsInt me of 0 => NONE | s => SOME s fun newStackSize ([], default) = default | newStackSize (MaximumMLStack NONE :: _, _) = 0 | newStackSize (MaximumMLStack (SOME n) :: _, _) = if n <= 0 then raise Thread "The stack size must be greater than zero" else n | newStackSize (_ :: l, default) = newStackSize (l, default) val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize" in (* Set attributes. Only changes the values that are specified. The others remain the same. *) fun setAttributes (attrs: threadAttribute list) : unit = let val me = self() val oldValues: Word.word = getAttrWord me val (newValue, mask) = attrsToWord attrs val stack = newStackSize(attrs, getStackSizeAsInt me) in RunCall.storeWord (self(), threadIdFlags, Word.orb(newValue, Word.andb(Word.notb mask, oldValues))); if stack = getStackSizeAsInt me then () else threadMaxStackSize stack; (* If we are now handling interrupts asynchronously check whether we have a pending interrupt now. This will only be effective if we were previously handling them synchronously or blocking them. *) if Word.andb(newValue, 0w4) = 0w4 then testInterrupt() else () end fun getAttributes() : threadAttribute list = let val me = self() in MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me) end (* These are used in the ConditionVar structure. They affect only the interrupt handling bits. *) fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self())) and setInterruptState(s: interruptState): unit = RunCall.storeWord (self(), threadIdFlags, Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self())))) local (* The default for a new thread is to ignore broadcasts and handle explicit interrupts synchronously. *) val (defaultAttrs, _) = attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch] val threadForkFunction: (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread" in fun fork(f:unit->unit, attrs: threadAttribute list): thread = let (* Any attributes specified explicitly override the defaults. *) val (attrWord, mask) = attrsToWord attrs val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs)) val stack = newStackSize(attrs, 0 (* Default is unlimited *)) in threadForkFunction(f, attrValue, stack) end end end val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf" and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive" and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt" local (* Send an interrupt to a thread. If it returns false the thread did not exist and this should raise an exception. *) val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread" in fun interrupt(t: thread) = if threadSendInterrupt t then () else raise Thread "Thread does not exist" end local val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread" in fun kill(t: thread) = if threadKillThread t then () else raise Thread "Thread does not exist" end val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors" local val numberOfPhysical: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors" in fun numPhysicalProcessors(): int option = (* It is not always possible to get this information *) case numberOfPhysical() of 0 => NONE | n => SOME n end end structure Mutex = struct type mutex = Word.word ref - fun mutex() = nvref 0w0 (* Initially unlocked. *) + val mutex = LibrarySupport.volatileWordRef (* Initially 0=unlocked. *) open Thread (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *) val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock" val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock" (* A mutex is implemented as a Word.word ref. It is initially set to 0 and locked by atomically incrementing it. If it was previously unlocked the result will by one but if it was already locked it will be some positive value. When it is unlocked it is atomically decremented. If there was no contention the result will again be 0 but if some other thread tried to lock it the result will be one or positive. In that case the unlocking thread needs to call in to the RTS to wake up the blocked thread. The cost of contention on the lock is very high. To try to avoid this we first loop (spin) to see if we can get the lock without contention. *) val spin_cycle = 20000 fun spin (m: mutex, c: int) = if ! m = 0w0 then () else if c = spin_cycle then () else spin(m, c+1); fun lock (m: mutex): unit = let val () = spin(m, 0) val newValue = atomicIncr m in if newValue = 0w1 then () (* We've acquired the lock. *) else (* It's locked. We return when we have the lock. *) ( threadMutexBlock m; lock m (* Try again. *) ) end fun unlock (m: mutex): unit = let val newValue = atomicDecr m in if newValue = 0w0 then () (* No contention. *) else (* Another thread has blocked and we have to release it. We can safely set the value to 0 here to release the lock. If another thread acquires it before we have woken up the other threads that's fine. Equally, if another thread incremented the count and saw it was still locked it will enter the RTS and try to acquire the lock there. It's probably better to reset it here rather than within the RTS since it allows another thread to acquire the lock immediately rather than after the rather long process of entering the RTS. Resetting this needs to be atomic with respect to atomic increment and decrement. That's not a problem on X86 so a simple assignment is sufficient but in the interpreter at least it's necessary to acquire a lock. *) ( atomicReset m; threadMutexUnlock m ) end (* Try to lock the mutex. If it was previously unlocked then lock it and return true otherwise return false. Because we don't block here there is the possibility that the thread that has locked it could release the lock shortly afterwards. The check for !m = 0w0 is an optimisation and nearly all the time it avoids the call to atomicIncr setting m to a value > 1. There is a small chance that another thread could lock the mutex between the test for !m = 0w0 and the atomicIncr. In that case the atomicIncr would return a value > 1 and the function that locked the mutex will have to call into the RTS to reset it when it is unlocked. *) fun trylock (m: mutex): bool = if !m = 0w0 andalso atomicIncr m = 0w1 then true (* We've acquired the lock. *) else false (* The lock was taken. *) end structure ConditionVar = struct open Thread (* A condition variable contains a lock and a list of suspended threads. *) type conditionVar = { lock: Mutex.mutex, threads: thread list ref } fun conditionVar(): conditionVar = - { lock = Mutex.mutex(), threads = nvref nil } + { lock = Mutex.mutex(), threads = LibrarySupport.volatileListRef() } local val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait" and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil" in fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = let val me = self() (* My thread id. *) fun waitAgain() = let fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t fun removeThis [] = raise Fail "Thread missing in list" | removeThis (h::t) = if equal(h, me) then t else h :: removeThis t val () = case t of SOME time => threadCondVarWaitUntil(lock, time) | NONE => threadCondVarWait lock val () = Mutex.lock lock (* Get the lock again. *) (* Are we still on the list? If so we haven't been explicitly woken up. We've either timed out, been interrupted or simply returned because the RTS needed to process some asynchronous results. *) val stillThere = doFind(!threads) open Time (* For >= *) in if not stillThere then (* We're done. *) ( Mutex.unlock lock; true ) else if (case t of NONE => false | SOME t => Time.now() >= t) then (* We've timed out. *) ( threads := removeThis(! threads); Mutex.unlock lock; false ) else ( (* See if we've been interrupted. If so remove ourselves and exit. *) testInterrupt() handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn); (* Otherwise just keep waiting. *) waitAgain() ) end in Mutex.lock lock; (* Lock the internal mutex. *) Mutex.unlock m; (* Unlock the external mutex *) threads := me :: !threads; (* Add ourselves to the list. *) waitAgain() (* Wait and return the result when we're done. *) end fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = let val originalIntstate = getInterruptState() (* Set this to handle interrupts synchronously unless we're already ignoring them. *) val () = if originalIntstate = InterruptDefer then () else setInterruptState InterruptSynch; (* Wait for the condition. If it raises an exception we still need to reacquire the lock unless we were handling interrupts asynchronously. *) val result = innerWait(c, m, t) handle exn => ( (* We had an exception. If we were handling exceptions synchronously we reacquire the lock. If it was set to InterruptAsynchOnce this counts as a single asynchronous exception and we restore the state as InterruptSynch. *) case originalIntstate of InterruptDefer => (* Shouldn't happen? *) Mutex.lock m | InterruptSynch => Mutex.lock m | InterruptAsynch => setInterruptState InterruptAsynch | InterruptAsynchOnce => setInterruptState InterruptSynch; raise exn (* Reraise the exception*) ) in (* Restore the original interrupt state first. *) setInterruptState originalIntstate; (* Normal return. Reacquire the lock before returning. *) Mutex.lock m; result end fun wait(c: conditionVar, m: Mutex.mutex) : unit = (doWait(c, m, NONE); ()) and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool = doWait(c, m, SOME t) end local (* This call wakes up the specified thread. If the thread has already been interrupted and is not ignoring interrupts it returns false. Otherwise it wakes up the thread and returns true. We have to use this because we define that if a thread is interrupted before it is signalled then it raises Interrupt. *) val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake" (* Wake a single thread if we can (signal). *) fun wakeOne [] = [] | wakeOne (thread::rest) = if threadCondVarWake thread then rest else thread :: wakeOne rest (* Wake all threads (broadcast). *) fun wakeAll [] = [] (* Always returns the empty list. *) | wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest) fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit = let val originalState = getInterruptState() in (* Set this to handle interrupts synchronously unless we're already ignoring them. We need to do this to avoid an asynchronous interrupt which could leave the internal lock in an inconsistent state. *) if originalState = InterruptDefer then () else setInterruptState InterruptSynch; (* Get the condition var lock. *) Mutex.lock lock; threads := wakeThreads(! threads); Mutex.unlock lock; setInterruptState originalState; (* Restore original state. *) (* Test if we were interrupted while we were handling interrupts synchronously. *) if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce then testInterrupt() else () end in fun signal cv = signalOrBroadcast(cv, wakeOne) and broadcast cv = signalOrBroadcast(cv, wakeAll) end end end; local fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?" and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?" and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?" in val () = PolyML.addPrettyPrinter prettyMutex and () = PolyML.addPrettyPrinter prettyThread and () = PolyML.addPrettyPrinter prettyCondVar end;