1 (* Copyright (C) 2013 Matthew Fluet.
2 * Copyright (C) 2002-2006 Henry Cejtin, Matthew Fluet, Suresh
3 * Jagannathan, and Stephen Weeks.
5 * MLton is released under a BSD-style license.
6 * See the file MLton-LICENSE for details.
9 signature STREAM_IO_EXTRA_ARG =
11 structure Array: MONO_ARRAY
12 structure ArraySlice: MONO_ARRAY_SLICE
13 structure PrimIO: PRIM_IO
14 structure Vector: MONO_VECTOR
15 structure VectorSlice: MONO_VECTOR_SLICE
16 sharing type PrimIO.elem
17 = Vector.elem = VectorSlice.elem
18 = Array.elem = ArraySlice.elem
19 sharing type PrimIO.vector
20 = Vector.vector = VectorSlice.vector
21 = Array.vector = ArraySlice.vector
22 sharing type PrimIO.vector_slice
24 = ArraySlice.vector_slice
25 sharing type PrimIO.array
26 = Array.array = ArraySlice.array
27 sharing type PrimIO.array_slice
30 val line: {isLine: PrimIO.elem -> bool,
31 lineElem: PrimIO.elem} option
32 val someElem: PrimIO.elem
33 val xlatePos : {toInt : PrimIO.pos -> Position.int,
34 fromInt : Position.int -> PrimIO.pos} option
37 functor StreamIOExtra (S: STREAM_IO_EXTRA_ARG): STREAM_IO_EXTRA =
41 structure PIO = PrimIO
43 structure AS = ArraySlice
46 val extract : vector * int * int option -> vector
47 = VectorSlice.vector o VectorSlice.slice
49 structure VS = VectorSlice
52 type vector = PIO.vector
53 type vector_slice = PIO.vector_slice
54 type reader = PIO.reader
55 type writer = PIO.writer
58 fun liftExn name function cause = raise IO.Io {name = name,
66 datatype buf = Buf of {array: A.array,
68 datatype bufferMode = NO_BUF
71 fun newLineBuf bufSize =
72 LINE_BUF (Buf {size = ref 0,
73 array = A.array (bufSize, someElem)})
74 fun newBlockBuf bufSize =
75 BLOCK_BUF (Buf {size = ref 0,
76 array = A.array (bufSize, someElem)})
78 datatype state = Active | Terminated | Closed
79 fun active state = case state of Active => true | _ => false
80 fun terminated state = not (active state)
81 fun closed state = case state of Closed => true | _ => false
83 datatype outstream = Out of {writer: writer,
84 augmented_writer: writer,
86 bufferMode: bufferMode ref}
88 fun equalsOut (Out {state = state1, ...}, Out {state = state2, ...}) =
91 fun outstreamSel (Out v, sel) = sel v
92 fun outstreamWriter os = outstreamSel (os, #writer)
93 fun writerSel (PIO.WR v, sel) = sel v
94 fun outstreamName os = writerSel (outstreamWriter os, #name)
97 fun flushGen (write: 'a -> int,
98 base: 'a -> ('b * int * int),
99 slice: ('b * int * int option) -> 'a,
102 val (b, i, sz) = base a
108 val j = write (slice (b, i, SOME (max - i)))
111 then raise (Fail "partial write")
118 fun flushVec (writer, x) =
119 case writerSel (writer, #writeVec) of
120 NONE => raise IO.BlockingNotSupported
121 | SOME writeVec => flushGen (writeVec, VS.base, VS.slice, x)
123 fun flushArr (writer, x) =
124 case writerSel (writer, #writeArr) of
125 NONE => raise IO.BlockingNotSupported
126 | SOME writeArr => flushGen (writeArr, AS.base, AS.slice, x)
129 fun flushBuf' (writer, size, array) =
134 ; flushArr (writer, AS.slice (array, 0, SOME size'))
137 fun flushBuf (writer, Buf {size, array}) = flushBuf' (writer, size, array)
139 fun output (os as Out {augmented_writer,
141 bufferMode, ...}, v) =
142 if terminated (!state)
143 then liftExn (outstreamName os) "output" IO.ClosedStream
145 fun put () = flushVec (augmented_writer, VS.full v)
146 fun doit (buf as Buf {size, array}, maybe) =
149 val newSize = curSize + V.length v
151 if newSize >= A.length array orelse maybe ()
152 then (flushBuf (augmented_writer, buf); put ())
153 else (A.copyVec {src = v, dst = array, di = curSize};
159 | LINE_BUF buf => doit (buf, fn () => (case line of
161 | SOME {isLine, lineElem} => V.exists isLine v))
162 | BLOCK_BUF buf => doit (buf, fn () => false)
164 handle exn => liftExn (outstreamName os) "output" exn
166 fun ensureActive (os as Out {state, ...}) =
169 else liftExn (outstreamName os) "output" IO.ClosedStream
172 val buf1 = A.array (1, someElem)
173 fun flush (os, size, array) =
175 val Out {augmented_writer, ...} = os
177 flushBuf' (augmented_writer, size, array)
178 handle exn => liftExn (outstreamName os) "output1" exn
181 (* output1 is implemented very carefully to make it fast. Think hard
182 * before modifying it, and test after you do, to make sure that it
183 * hasn't been slowed down.
185 fun output1 (os as Out {bufferMode, ...}, c): unit =
187 BLOCK_BUF (Buf {array, size}) =>
191 (* Use the bounds check for the update to make sure there
192 * is space to put the character in the array.
194 (A.update (array, n, c)
198 val _ = ensureActive os
199 val _ = flush (os, size, array)
200 val _ = A.update (array, 0, c)
206 | LINE_BUF (Buf {array, size}) =>
210 (* Use the bounds check for the update to make sure there
211 * is space to put the character in the array.
213 (A.update (array, n, c)
217 val _ = ensureActive os
218 val _ = flush (os, size, array)
219 val _ = A.update (array, 0, c)
227 | SOME {isLine, lineElem} =>
228 if isLine c then flush (os, size, array) else ()
232 val _ = ensureActive os
233 val _ = A.update (buf1, 0, c)
234 val Out {augmented_writer, ...} = os
236 flushArr (augmented_writer, AS.slice (buf1, 0, SOME 1))
240 fun outputSlice (os as Out {augmented_writer,
242 bufferMode, ...}, v) =
243 if terminated (!state)
244 then liftExn (outstreamName os) "output" IO.ClosedStream
246 fun put () = flushVec (augmented_writer, v)
247 fun doit (buf as Buf {size, array}, maybe) =
250 val newSize = curSize + VS.length v
252 if newSize >= A.length array orelse maybe ()
253 then (flushBuf (augmented_writer, buf); put ())
254 else (AS.copyVec {src = v, dst = array, di = curSize};
260 | LINE_BUF buf => doit (buf, fn () => (case line of
262 | SOME {isLine, lineElem} => VS.exists isLine v))
263 | BLOCK_BUF buf => doit (buf, fn () => false)
265 handle exn => liftExn (outstreamName os) "output" exn
267 fun flushOut (os as Out {augmented_writer,
270 if terminated (!state)
272 else case !bufferMode of
274 | LINE_BUF buf => flushBuf (augmented_writer, buf)
275 | BLOCK_BUF buf => flushBuf (augmented_writer, buf)
276 handle exn => liftExn (outstreamName os) "flushOut" exn
278 fun makeTerminated (Out {bufferMode, ...}) =
280 fun doit (Buf {array, size}) = size := A.length array
283 BLOCK_BUF b => doit b
284 | LINE_BUF b => doit b
288 fun closeOut (os as Out {state, ...}) =
292 if terminated (!state)
294 else (writerSel (outstreamWriter os, #close)) ();
297 handle exn => liftExn (outstreamName os) "closeOut" exn
299 fun getBufferMode (Out {bufferMode, ...}) =
302 | LINE_BUF _ => IO.LINE_BUF
303 | BLOCK_BUF _ => IO.BLOCK_BUF
305 fun setBufferMode (os as Out {bufferMode, ...}, mode) =
307 IO.NO_BUF => (flushOut os;
308 bufferMode := NO_BUF)
312 newLineBuf (writerSel (outstreamWriter os, #chunkSize))
317 | BLOCK_BUF _ => doit ()
319 | IO.BLOCK_BUF => let
322 newBlockBuf (writerSel (outstreamWriter os, #chunkSize))
326 | LINE_BUF _ => doit ()
330 fun mkOutstream' {writer, closed, bufferMode} =
332 val bufSize = writerSel (writer, #chunkSize)
334 Out {writer = writer,
335 augmented_writer = PIO.augmentWriter writer,
336 state = ref (if closed then Closed else Active),
337 bufferMode = ref (case bufferMode of
339 | IO.LINE_BUF => newLineBuf bufSize
340 | IO.BLOCK_BUF => newBlockBuf bufSize)}
342 fun mkOutstream (writer, bufferMode) =
343 mkOutstream' {writer = writer, closed = false, bufferMode = bufferMode}
345 fun getWriter (os as Out {writer, state, bufferMode, ...}) =
347 then liftExn (outstreamName os) "getWriter" IO.ClosedStream
349 ; state := Terminated
354 | LINE_BUF _ => IO.LINE_BUF
355 | BLOCK_BUF _ => IO.BLOCK_BUF))
357 datatype out_pos = OutPos of {pos: pos,
358 outstream: outstream}
360 fun getPosOut (os as Out {...}) =
362 case writerSel (outstreamSel (os, #writer), #getPos) of
363 NONE => liftExn (outstreamName os) "getPosOut" IO.RandomAccessNotSupported
364 | SOME getPos => OutPos {pos = getPos (),
367 fun setPosOut (OutPos {pos, outstream = os}) =
369 case writerSel (outstreamSel (os, #writer), #setPos) of
370 NONE => liftExn (outstreamName os) "setPosOut" IO.RandomAccessNotSupported
371 | SOME setPos => setPos pos;
374 fun filePosOut (OutPos {pos, ...}) = pos
380 datatype state = Link of {buf: buf}
381 | Eos of {buf: buf} (* V.length inp = 0 *)
385 and buf = Buf of {inp: V.vector,
389 datatype instream = In of {common: {reader: reader,
390 augmented_reader: reader,
391 tail: state ref ref},
394 (* @ s = Eos, End, Truncated, Closed ==>
395 * pos = V.length inp, !next = s
398 fun equalsIn (In {common = {tail = tail1, ...}, ...},
399 In {common = {tail = tail2, ...}, ...}) =
402 fun update (In {common, ...}, pos, buf) =
406 fun updatePos (is as In {buf, ...}, pos) = update (is, pos, buf)
407 fun updateBufBeg (is, buf) = update (is, 0, buf)
408 fun updateBufEnd (is, buf as Buf {inp, ...}) = update (is, V.length inp, buf)
410 fun instreamSel (In v, sel) = sel v
411 fun instreamCommon is = instreamSel (is, #common)
412 fun instreamCommonSel (is, sel) = sel (instreamCommon is)
413 fun instreamReader is = instreamCommonSel (is, #reader)
414 fun instreamTail is = instreamCommonSel (is, #tail)
415 fun readerSel (PIO.RD v, sel) = sel v
416 fun instreamName is = readerSel (instreamReader is, #name)
418 val empty = V.tabulate (0, fn _ => someElem)
421 (is as In {common = {augmented_reader, tail, ...}, ...})
426 fun link (base, inp) = let
428 val buf = Buf {inp = inp,
431 val this = if V.length inp = 0
433 else Link {buf = buf}
434 val _ = !tail := this
442 case readerSel (augmented_reader, #getPos) of
444 | SOME getPos => SOME (getPos ())
445 val inp = readVec (readerSel (augmented_reader, #chunkSize))
447 liftExn (instreamName is) function exn
451 | SOME inp => link (base, inp)
455 then case readerSel (augmented_reader, #readVec) of
456 NONE => liftExn (instreamName is)
458 IO.BlockingNotSupported
459 | SOME readVec => doit (SOME o readVec)
460 else case readerSel (augmented_reader, #readVecNB) of
461 NONE => liftExn (instreamName is)
463 IO.NonblockingNotSupported
464 | SOME readVecNB => doit readVecNB
466 | _ => liftExn (instreamName is) function Match
468 fun extendB function is = valOf (extend function is true)
469 fun extendNB function is = extend function is false
471 fun input (is as In {pos, buf as Buf {inp, next, ...}, ...}) =
472 if pos < V.length inp
473 then (V.extract(inp, pos, NONE),
474 updateBufEnd (is, buf))
478 Link {buf as Buf {inp, ...}} => (inp, updateBufEnd (is, buf))
479 | Eos {buf} => (empty, updateBufBeg (is, buf))
480 | End => doit (extendB "input" is)
487 if n < 0 orelse n > V.maxLen
490 fun first (is as In {pos, buf as Buf {inp, ...}, ...}, n) =
491 if pos + n <= V.length inp
493 val inp' = V.extract(inp, pos, SOME n)
495 (inp', updatePos (is, pos + n))
498 val inp' = VS.slice(inp, pos, NONE)
500 loop (buf, [inp'], n - (V.length inp - pos))
502 and loop (buf' as Buf {next, ...}, inps, n) =
506 Link {buf as Buf {inp, ...}} =>
509 val inp' = VS.slice(inp, 0, SOME n)
510 val inps = inp'::inps
512 finish (inps, update (is, n, buf))
514 else loop (buf, (VS.full inp)::inps, n - V.length inp)
516 finish (inps, if n > 0
517 then updateBufBeg (is, buf)
518 else updateBufEnd (is, buf'))
519 | End => doit (extendB "inputN" is)
520 | _ => finish (inps, updateBufEnd (is, buf'))
524 and finish (inps, is) =
525 let val inp = VS.concat (List.rev inps)
532 (* input1' will move past a temporary end of stream *)
533 fun input1' (is as In {pos, buf = Buf {inp, next, ...}, ...}) =
534 case SOME (V.sub (inp, pos)) handle Subscript => NONE of
539 Link {buf} => input1' (updateBufBeg (is, buf))
540 | Eos {buf} => (NONE, updateBufBeg (is, buf))
541 | End => doit (extendB "input1" is)
548 val is' = updatePos (is, pos + 1)
553 (* input1 will never move past a temporary end of stream *)
556 (SOME c, is') => SOME (c, is')
562 let val (inp, is') = input is
565 then (V.concat (List.rev ac), is')
566 else loop (is', inp::ac)
574 NONE => (fn is => SOME (input is))
575 | SOME {isLine, lineElem} =>
577 val lineVecSl = VS.full (V.tabulate (1, fn _ => lineElem))
581 fun findLine (v, i) =
584 case SOME (V.sub (v, i)) handle Subscript => NONE of
593 fun first (is as In {pos, buf as Buf {inp, next, ...}, ...}) =
594 (case findLine (inp, pos) of
596 val inp' = V.extract(inp, pos, SOME (i - pos))
598 SOME (inp', updatePos (is, i))
600 | NONE => if pos < V.length inp
602 val inp' = VS.slice(inp, pos, NONE)
609 Link {buf} => first (updateBufBeg (is, buf))
611 | End => doit (extendB "inputLine" is)
616 and loop (buf' as Buf {next, ...}, inps) =
617 (* List.length inps > 0 *)
621 Link {buf as Buf {inp, ...}} =>
622 (case findLine (inp, 0) of
624 val inp' = VS.slice(inp, 0, SOME i)
625 val inps = inp'::inps
627 finish (inps, update (is, i, buf), false)
629 | NONE => loop (buf, (VS.full inp)::inps))
630 | End => doit (extendB "inputLine" is)
631 | _ => finish (inps, updateBufEnd (is, buf'), true)
635 and finish (inps, is, trail) =
640 val inp = VS.concat (List.rev inps)
649 fun canInput (is as In {pos, buf = Buf {inp, next, ...}, ...}, n) =
650 if n < 0 orelse n > V.maxLen
657 and add (inps, inp, k) =
663 then finish (inps, n)
664 else loop (inps, k + l)
667 case extendNB "canInput" is of
668 NONE => finish (inps, k)
669 | SOME (Link {buf = Buf {inp, ...}}) =>
671 | SOME (Eos _) => finish (inps, k)
672 | _ => raise Fail "extendNB bug"
673 and finish (inps, k) =
675 val inp = V.concat (List.rev inps)
680 if pos < V.length inp
681 then SOME (Int.min (V.length inp - pos, n))
684 (case extendNB "canInput" is of
686 | SOME (Link {buf = Buf {inp, base, ...}}) =>
688 val (inp, k) = start inp
689 val buf = Buf {inp = inp,
693 next := Link {buf = buf};
696 | SOME (Eos _) => SOME 0
697 | _ => raise Fail "extendNB bug")
703 datatype t = T of {close: unit -> unit,
707 fun close (T {close, name, tail}) =
711 ; close () handle exn => liftExn name "closeIn" exn)
714 fun equalsInstream (T {tail, ...}, is) = tail = instreamTail is
716 fun make (In {common = {reader = PIO.RD {close, name, ...},
719 T {close = close, name = name, tail = tail}
722 val closeIn = Close.close o Close.make
725 let val (inp, _) = input is
729 fun mkInstream' {bufferContents, closed, reader} =
731 val next = ref (if closed then Closed else End)
733 case readerSel (reader, #getPos) of
735 | SOME getPos => SOME (getPos ())
737 case bufferContents of
738 NONE => Buf {inp = empty,
741 | SOME (lastRead, v) =>
743 then Buf {inp = empty,
745 next = ref (Eos {buf = Buf {inp = empty,
748 else case (lastRead, base, xlatePos) of
749 (true, SOME b, SOME {fromInt, toInt}) =>
752 fromInt (Position.- (toInt b, Position.fromInt (V.length v)))
762 In {common = {reader = reader,
763 augmented_reader = PIO.augmentReader reader,
769 fun mkInstream (reader, bufferContents) =
770 mkInstream' {bufferContents = if 0 = V.length bufferContents
772 else SOME (false, bufferContents),
776 fun getReader (is as In {common = {reader, tail, ...}, ...}) =
778 End => (!tail := Truncated;
779 let val (inp, _) = inputAll is
782 | _ => liftExn (instreamName is) "getReader" IO.ClosedStream
784 fun filePosIn (is as In {common = {augmented_reader, ...},
786 buf = Buf {base, ...}, ...}) =
788 SOME b => (case xlatePos of
789 SOME {fromInt, toInt} =>
790 (fromInt (Position.+ (Position.fromInt pos, toInt b)))
791 | NONE => (case (readerSel (augmented_reader, #readVec),
792 readerSel (augmented_reader, #getPos),
793 readerSel (augmented_reader, #setPos)) of
794 (SOME readVec, SOME getPos, SOME setPos) =>
796 val curPos = getPos ()
799 ; ignore (readVec pos)
800 ; getPos () before setPos curPos
803 liftExn (instreamName is) "filePosIn" IO.RandomAccessNotSupported))
804 | NONE => liftExn (instreamName is) "filePosIn" IO.RandomAccessNotSupported
807 signature STREAM_IO_ARG =
809 structure Array: MONO_ARRAY
810 structure ArraySlice: MONO_ARRAY_SLICE
811 structure PrimIO: PRIM_IO
812 structure Vector: MONO_VECTOR
813 structure VectorSlice: MONO_VECTOR_SLICE
814 sharing type PrimIO.elem = Vector.elem = VectorSlice.elem = Array.elem
816 sharing type PrimIO.vector = Vector.vector = VectorSlice.vector
817 = Array.vector = ArraySlice.vector
818 sharing type PrimIO.vector_slice = VectorSlice.slice
819 = ArraySlice.vector_slice
820 sharing type PrimIO.array = Array.array = ArraySlice.array
821 sharing type PrimIO.array_slice = ArraySlice.slice
823 val someElem: PrimIO.elem
826 functor StreamIO (S: STREAM_IO_ARG): STREAM_IO =
827 StreamIOExtra (open S
831 signature STREAM_IO_EXTRA_FILE_ARG = STREAM_IO_EXTRA_ARG
833 functor StreamIOExtraFile (S: STREAM_IO_EXTRA_FILE_ARG): STREAM_IO_EXTRA_FILE =
837 structure PIO = PrimIO
840 structure StreamIO = StreamIOExtra (S)
843 fun liftExn name function cause = raise IO.Io {name = name,
851 fun writerSel (PIO.WR v, sel) = sel v
852 fun outstreamName os = writerSel (outstreamWriter os, #name)
855 case writerSel (outstreamWriter os, #ioDesc) of
856 SOME ioDesc => valOf (Posix.FileSys.iodToFD ioDesc)
857 | NONE => liftExn (outstreamName os) "outFd" (Fail "<no ioDesc>")
859 val openOutstreams : (outstream * {close: bool}) list ref = ref []
863 val _ = Cleaner.addNew
864 (Cleaner.atExit, fn () =>
865 List.app (fn (os, {close}) =>
868 else flushOut os) (!openOutstreams))
870 fn {bufferMode, closeAtExit, closed, writer} =>
872 val os = mkOutstream' {bufferMode = bufferMode,
878 else openOutstreams := ((os, {close = closeAtExit})
879 :: (!openOutstreams))
885 fun mkOutstream' {bufferMode, closed, writer} =
886 mkOutstream'' {bufferMode = bufferMode,
891 fun mkOutstream (writer, bufferMode) =
892 mkOutstream' {bufferMode = bufferMode,
896 val closeOut = fn os =>
898 val _ = openOutstreams := List.filter (fn (os', _) =>
899 not (equalsOut (os, os')))
909 fun readerSel (PIO.RD v, sel) = sel v
911 fun instreamName is = readerSel (instreamReader is, #name)
914 case readerSel (instreamReader is, #ioDesc) of
915 SOME ioDesc => valOf (Posix.FileSys.iodToFD ioDesc)
916 | NONE => liftExn (instreamName is) "inFd" (Fail "<no ioDesc>")
918 val closeAtExits: Close.t list ref = ref []
922 val _ = Cleaner.addNew (Cleaner.atExit, fn () =>
923 List.app Close.close (!closeAtExits))
925 fn {bufferContents, closeAtExit, closed, reader} =>
928 mkInstream' {bufferContents = bufferContents,
932 if closed orelse not closeAtExit
934 else closeAtExits := Close.make is :: (!closeAtExits)
940 fun mkInstream' {bufferContents, closed, reader} =
941 mkInstream'' {bufferContents = bufferContents,
947 fun mkInstream (reader, bufferContents) =
948 mkInstream' {bufferContents = (if V.length bufferContents = 0 then NONE
949 else SOME (false, bufferContents)),
953 val closeIn = fn is =>
957 List.filter (fn c => Close.equalsInstream (c, is)) (!closeAtExits)