| 1 | (* timeout.sml |
| 2 | * 2004 Matthew Fluet (mfluet@acm.org) |
| 3 | * Ported to MLton threads. |
| 4 | *) |
| 5 | |
| 6 | (* timeout.sml |
| 7 | * |
| 8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. |
| 9 | * COPYRIGHT (c) 1989-1991 John H. Reppy |
| 10 | * |
| 11 | * Events for synchronizing on timeouts. |
| 12 | *) |
| 13 | |
| 14 | structure TimeOut : TIME_OUT_EXTRA = |
| 15 | struct |
| 16 | structure Assert = LocalAssert(val assert = false) |
| 17 | structure Debug = LocalDebug(val debug = false) |
| 18 | |
| 19 | structure S = Scheduler |
| 20 | structure E = Event |
| 21 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) |
| 22 | fun debug' msg = debug (fn () => msg) |
| 23 | |
| 24 | datatype trans_id = datatype TransID.trans_id |
| 25 | datatype trans_id_state = datatype TransID.trans_id_state |
| 26 | |
| 27 | |
| 28 | (* this holds an approximation of the current time of day. It is |
| 29 | * cleared at each pre-emption, and initialized on demand (by getTime). |
| 30 | *) |
| 31 | val clock = ref NONE |
| 32 | |
| 33 | (* returns an approximation of the current time of day |
| 34 | * (this is at least as accurate as the time quantum). |
| 35 | *) |
| 36 | fun getTime () = |
| 37 | case !clock of |
| 38 | NONE => let val t = Time.now() |
| 39 | in clock := SOME t; t |
| 40 | end |
| 41 | | SOME t => t |
| 42 | fun preemptTime () = clock := NONE |
| 43 | |
| 44 | (* The queue of threads waiting for timeouts. |
| 45 | * It is sorted in increasing order of time value. |
| 46 | *) |
| 47 | structure TQ = FunPriorityQueue(structure Key = struct open Time type t = time end) |
| 48 | type item = trans_id * (unit -> unit) * S.rdy_thread |
| 49 | val timeQ : item TQ.t ref = ref (TQ.new ()) |
| 50 | |
| 51 | fun cleaner (readied: unit -> unit) elt = |
| 52 | let |
| 53 | val now = getTime () |
| 54 | val (TXID txst, cleanUp: unit -> unit, t) = TQ.Elt.value elt |
| 55 | in |
| 56 | case !txst of |
| 57 | CANCEL => true |
| 58 | | _ => if Time.<=(TQ.Elt.key elt, now) |
| 59 | then (readied () |
| 60 | ; S.ready t |
| 61 | ; cleanUp () |
| 62 | ; true) |
| 63 | else false |
| 64 | end |
| 65 | |
| 66 | fun timeWait (time, txid, cleanUp, t) = |
| 67 | (Assert.assertAtomic' ("TimeOut.timeWait", NONE) |
| 68 | ; timeQ := TQ.enqueAndClean(!timeQ, time, (txid, cleanUp, t), cleaner (fn () => ()))) |
| 69 | |
| 70 | (** NOTE: unlike for most base events, the block functions of time-out |
| 71 | ** events do not have to exit the atomic region or execute the clean-up |
| 72 | ** operation. This is done when they are removed from the waiting queue. |
| 73 | **) |
| 74 | fun timeOutEvt time = |
| 75 | let |
| 76 | fun blockFn {transId, cleanUp, next} = |
| 77 | let |
| 78 | val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.blockFn", NONE) |
| 79 | val () = debug' "timeOutEvt(3.2.1)" (* Atomic 1 *) |
| 80 | val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(3.2.1)", SOME 1) |
| 81 | val () = |
| 82 | S.atomicSwitch |
| 83 | (fn t => |
| 84 | (timeWait (Time.+(time, getTime ()), transId, cleanUp, S.prep t) |
| 85 | ; next ())) |
| 86 | val () = debug' "timeOutEvt(3.2.3)" (* NonAtomic *) |
| 87 | val () = Assert.assertNonAtomic' "TimeOut.timeOutEvt(3.2.3)" |
| 88 | in |
| 89 | () |
| 90 | end |
| 91 | fun pollFn () = |
| 92 | let |
| 93 | val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.pollFn", NONE) |
| 94 | val () = debug' "timeOutEvt(2)" (* Atomic 1 *) |
| 95 | val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(2)", SOME 1) |
| 96 | in |
| 97 | if Time.<=(time, Time.zeroTime) |
| 98 | then E.enabled {prio = ~1, doitFn = S.atomicEnd} |
| 99 | else E.blocked blockFn |
| 100 | end |
| 101 | in |
| 102 | E.bevt pollFn |
| 103 | end |
| 104 | |
| 105 | fun atTimeEvt time = |
| 106 | let |
| 107 | fun blockFn {transId, cleanUp, next} = |
| 108 | let |
| 109 | val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.blockFn", NONE) |
| 110 | val () = debug' "atTimeEvt(3.2.1)" (* Atomic 1 *) |
| 111 | val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(3.2.1)", SOME 1) |
| 112 | val () = |
| 113 | S.atomicSwitch |
| 114 | (fn t => |
| 115 | (timeWait (time, transId, cleanUp, S.prep t) |
| 116 | ; next ())) |
| 117 | val () = debug' "atTimeEvt(3.2.3)" (* NonAtomic *) |
| 118 | val () = Assert.assertNonAtomic' "TimeOut.atTimeEvt(3.2.3)" |
| 119 | in |
| 120 | () |
| 121 | end |
| 122 | fun pollFn () = |
| 123 | let |
| 124 | val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.pollFn", NONE) |
| 125 | val () = debug' "atTimeEvt(2)" (* Atomic 1 *) |
| 126 | val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(2)", SOME 1) |
| 127 | in |
| 128 | if Time.<=(time, getTime()) |
| 129 | then E.enabled {prio = ~1, doitFn = S.atomicEnd} |
| 130 | else E.blocked blockFn |
| 131 | end |
| 132 | in |
| 133 | E.bevt pollFn |
| 134 | end |
| 135 | |
| 136 | (* reset various pieces of state *) |
| 137 | fun reset () = timeQ := TQ.new () |
| 138 | |
| 139 | (* what to do at a preemption *) |
| 140 | fun preempt () : Time.time option option = |
| 141 | let |
| 142 | val () = Assert.assertAtomic' ("TimeOut.preempt", NONE) |
| 143 | val () = debug' "TimeOut.preempt" (* Atomic 1 *) |
| 144 | val () = Assert.assertAtomic' ("TimeOut.preempt", SOME 1) |
| 145 | val () = preemptTime () |
| 146 | val timeQ' = !timeQ |
| 147 | in |
| 148 | if TQ.empty timeQ' |
| 149 | then NONE |
| 150 | else let |
| 151 | val readied = ref false |
| 152 | val timeQ' = TQ.clean (timeQ', cleaner (fn () => readied := true)) |
| 153 | val () = timeQ := timeQ' |
| 154 | in |
| 155 | if !readied |
| 156 | then SOME NONE |
| 157 | else case TQ.peek timeQ' of |
| 158 | NONE => NONE |
| 159 | | SOME elt => SOME(SOME(Time.-(TQ.Elt.key elt, getTime ()))) |
| 160 | end |
| 161 | end |
| 162 | end |