Import Upstream version 20180207
[hcoop/debian/mlton.git] / lib / cml / core-cml / timeout.sml
1 (* timeout.sml
2 * 2004 Matthew Fluet (mfluet@acm.org)
3 * Ported to MLton threads.
4 *)
5
6 (* timeout.sml
7 *
8 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
9 * COPYRIGHT (c) 1989-1991 John H. Reppy
10 *
11 * Events for synchronizing on timeouts.
12 *)
13
14 structure TimeOut : TIME_OUT_EXTRA =
15 struct
16 structure Assert = LocalAssert(val assert = false)
17 structure Debug = LocalDebug(val debug = false)
18
19 structure S = Scheduler
20 structure E = Event
21 fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
22 fun debug' msg = debug (fn () => msg)
23
24 datatype trans_id = datatype TransID.trans_id
25 datatype trans_id_state = datatype TransID.trans_id_state
26
27
28 (* this holds an approximation of the current time of day. It is
29 * cleared at each pre-emption, and initialized on demand (by getTime).
30 *)
31 val clock = ref NONE
32
33 (* returns an approximation of the current time of day
34 * (this is at least as accurate as the time quantum).
35 *)
36 fun getTime () =
37 case !clock of
38 NONE => let val t = Time.now()
39 in clock := SOME t; t
40 end
41 | SOME t => t
42 fun preemptTime () = clock := NONE
43
44 (* The queue of threads waiting for timeouts.
45 * It is sorted in increasing order of time value.
46 *)
47 structure TQ = FunPriorityQueue(structure Key = struct open Time type t = time end)
48 type item = trans_id * (unit -> unit) * S.rdy_thread
49 val timeQ : item TQ.t ref = ref (TQ.new ())
50
51 fun cleaner (readied: unit -> unit) elt =
52 let
53 val now = getTime ()
54 val (TXID txst, cleanUp: unit -> unit, t) = TQ.Elt.value elt
55 in
56 case !txst of
57 CANCEL => true
58 | _ => if Time.<=(TQ.Elt.key elt, now)
59 then (readied ()
60 ; S.ready t
61 ; cleanUp ()
62 ; true)
63 else false
64 end
65
66 fun timeWait (time, txid, cleanUp, t) =
67 (Assert.assertAtomic' ("TimeOut.timeWait", NONE)
68 ; timeQ := TQ.enqueAndClean(!timeQ, time, (txid, cleanUp, t), cleaner (fn () => ())))
69
70 (** NOTE: unlike for most base events, the block functions of time-out
71 ** events do not have to exit the atomic region or execute the clean-up
72 ** operation. This is done when they are removed from the waiting queue.
73 **)
74 fun timeOutEvt time =
75 let
76 fun blockFn {transId, cleanUp, next} =
77 let
78 val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.blockFn", NONE)
79 val () = debug' "timeOutEvt(3.2.1)" (* Atomic 1 *)
80 val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(3.2.1)", SOME 1)
81 val () =
82 S.atomicSwitch
83 (fn t =>
84 (timeWait (Time.+(time, getTime ()), transId, cleanUp, S.prep t)
85 ; next ()))
86 val () = debug' "timeOutEvt(3.2.3)" (* NonAtomic *)
87 val () = Assert.assertNonAtomic' "TimeOut.timeOutEvt(3.2.3)"
88 in
89 ()
90 end
91 fun pollFn () =
92 let
93 val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.pollFn", NONE)
94 val () = debug' "timeOutEvt(2)" (* Atomic 1 *)
95 val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(2)", SOME 1)
96 in
97 if Time.<=(time, Time.zeroTime)
98 then E.enabled {prio = ~1, doitFn = S.atomicEnd}
99 else E.blocked blockFn
100 end
101 in
102 E.bevt pollFn
103 end
104
105 fun atTimeEvt time =
106 let
107 fun blockFn {transId, cleanUp, next} =
108 let
109 val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.blockFn", NONE)
110 val () = debug' "atTimeEvt(3.2.1)" (* Atomic 1 *)
111 val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(3.2.1)", SOME 1)
112 val () =
113 S.atomicSwitch
114 (fn t =>
115 (timeWait (time, transId, cleanUp, S.prep t)
116 ; next ()))
117 val () = debug' "atTimeEvt(3.2.3)" (* NonAtomic *)
118 val () = Assert.assertNonAtomic' "TimeOut.atTimeEvt(3.2.3)"
119 in
120 ()
121 end
122 fun pollFn () =
123 let
124 val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.pollFn", NONE)
125 val () = debug' "atTimeEvt(2)" (* Atomic 1 *)
126 val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(2)", SOME 1)
127 in
128 if Time.<=(time, getTime())
129 then E.enabled {prio = ~1, doitFn = S.atomicEnd}
130 else E.blocked blockFn
131 end
132 in
133 E.bevt pollFn
134 end
135
136 (* reset various pieces of state *)
137 fun reset () = timeQ := TQ.new ()
138
139 (* what to do at a preemption *)
140 fun preempt () : Time.time option option =
141 let
142 val () = Assert.assertAtomic' ("TimeOut.preempt", NONE)
143 val () = debug' "TimeOut.preempt" (* Atomic 1 *)
144 val () = Assert.assertAtomic' ("TimeOut.preempt", SOME 1)
145 val () = preemptTime ()
146 val timeQ' = !timeQ
147 in
148 if TQ.empty timeQ'
149 then NONE
150 else let
151 val readied = ref false
152 val timeQ' = TQ.clean (timeQ', cleaner (fn () => readied := true))
153 val () = timeQ := timeQ'
154 in
155 if !readied
156 then SOME NONE
157 else case TQ.peek timeQ' of
158 NONE => NONE
159 | SOME elt => SOME(SOME(Time.-(TQ.Elt.key elt, getTime ())))
160 end
161 end
162 end