Import Upstream version 20180207
[hcoop/debian/mlton.git] / lib / cml / core-cml / sync-var.sml
CommitLineData
7f918cf1
CE
1(* sync-var.sml
2 * 2004 Matthew Fluet (mfluet@acm.org)
3 * Ported to MLton threads.
4 *)
5
6(* sync-var.sml
7 *
8 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
9 * COPYRIGHT (c) 1989-1991 John H. Reppy
10 *
11 * The implementation of Id-style synchronizing memory cells.
12 *)
13
14structure SyncVar : SYNC_VAR_EXTRA =
15 struct
16 structure Assert = LocalAssert(val assert = false)
17 structure Debug = LocalDebug(val debug = false)
18
19 structure Q = ImpQueue
20 structure S = Scheduler
21 structure E = Event
22 fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
23
24 datatype trans_id = datatype TransID.trans_id
25 datatype trans_id_state = datatype TransID.trans_id_state
26
27
28 (* the underlying representation of both ivars and mvars is the same. *)
29 datatype 'a cell =
30 CELL of {prio : int ref,
31 readQ : (trans_id * 'a S.thread) Q.t,
32 value : 'a option ref}
33
34 type 'a ivar = 'a cell
35 type 'a mvar = 'a cell
36
37 exception Put
38
39 fun newCell () = CELL {prio = ref 0, readQ = Q.new(), value = ref NONE}
40
41 (* sameCell : ('a cell * 'a cell) -> bool *)
42 fun sameCell (CELL {prio = prio1, ...}, CELL {prio = prio2, ...}) =
43 prio1 = prio2
44
45 (* bump a priority value by one, returning the old value *)
46 fun bumpPriority (p as ref n) = (p := n+1; n)
47
48 (* functions to clean channel input and output queues *)
49 local
50 fun cleaner (TXID txst, _) =
51 case !txst of CANCEL => true | _ => false
52 in
53 fun cleanAndDeque q =
54 Q.cleanAndDeque (q, cleaner)
55 fun enqueAndClean (q, item) =
56 Q.enqueAndClean (q, item, cleaner)
57 end
58
59 (* When a thread is resumed after being blocked on an iGet or mGet operation,
60 * there may be other threads also blocked on the variable. This function
61 * is used to propagate the message to all of the threads that are blocked
62 * on the variable (or until one of them takes the value in the mvar case).
63 * It must be called from an atomic region; when the readQ is finally empty,
64 * we leave the atomic region. We must use "cleanAndDeque" to get items
65 * from the readQ in the unlikely event that a single thread executes a
66 * choice of multiple gets on the same variable.
67 *)
68 fun relayMsg (readQ, msg) =
69 case (cleanAndDeque readQ) of
70 NONE => S.atomicEnd()
71 | SOME (txid, t) =>
72 S.readyAndSwitch
73 (fn () =>
74 (TransID.force txid
75 ; S.prepVal (t, msg)))
76
77 (** G-variables **)
78 (* Generalized synchronized variables,
79 * to factor out the common operations.
80 *)
81
82 fun gPut (name, CELL {prio, readQ, value}, x) =
83 let
84 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name])
85 val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
86 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
87 val () = S.atomicBegin()
88 val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
89 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
90 val () =
91 case !value of
92 NONE =>
93 let
94 val () = debug (fn () => concat [name, "(3.1.1)"]) (* Atomic 1 *)
95 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.1.1)"], SOME 1)
96 val () = value := SOME x
97 val () =
98 case cleanAndDeque readQ of
99 NONE => S.atomicEnd ()
100 | SOME (rtxid, rt) =>
101 S.readyAndSwitch
102 (fn () =>
103 (prio := 1
104 ; TransID.force rtxid
105 ; S.prepVal (rt, x)))
106 val () = debug (fn () => concat [name, "(3.1.2)"]) (* NonAtomic *)
107 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.1.2)"])
108 in
109 ()
110 end
111 | SOME _ =>
112 let
113 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
114 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
115 val () = S.atomicEnd ()
116 val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
117 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
118 in
119 raise Put
120 end
121 val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
122 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
123 in
124 ()
125 end
126
127 (* Swap the current contents of the cell with a new value;
128 * it is guaranteed to be atomic.
129 *)
130 fun gSwap (name, doSwap, CELL {prio, readQ, value}) =
131 let
132 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
133 val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
134 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
135 val () = S.atomicBegin()
136 val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
137 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
138 val msg =
139 case !value of
140 NONE =>
141 let
142 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
143 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
144 val msg =
145 S.atomicSwitchToNext
146 (fn rt => enqueAndClean (readQ, (TransID.mkTxId (), rt)))
147 val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
148 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
149 val () = doSwap value
150 val () = relayMsg (readQ, msg)
151 val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
152 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
153 in
154 msg
155 end
156 | SOME x =>
157 let
158 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
159 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
160 val () = prio := 1
161 val () = doSwap value
162 val () = S.atomicEnd ()
163 val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
164 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
165 in
166 x
167 end
168 val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
169 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
170 in
171 msg
172 end
173
174 fun gSwapEvt (name, doSwap, CELL{prio, readQ, value}) =
175 let
176 fun doitFn () =
177 let
178 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".doitFn"], NONE)
179 val x = valOf (!value)
180 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
181 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
182 val () = prio := 1
183 val () = doSwap value
184 val () = S.atomicEnd ()
185 val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
186 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
187 in
188 x
189 end
190 fun blockFn {transId, cleanUp, next} =
191 let
192 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".blockFn"], NONE)
193 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
194 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
195 val msg =
196 S.atomicSwitch
197 (fn rt =>
198 (enqueAndClean (readQ, (transId, rt))
199 ; next ()))
200 val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
201 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
202 val () = cleanUp ()
203 val () = doSwap value
204 val () = relayMsg (readQ, msg)
205 val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
206 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
207 in
208 msg
209 end
210 fun pollFn () =
211 let
212 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".pollFn"], NONE)
213 val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
214 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
215 in
216 case !value of
217 NONE => E.blocked blockFn
218 | SOME _ => E.enabled {prio = bumpPriority prio,
219 doitFn = doitFn}
220 end
221 in
222 E.bevt pollFn
223 end
224
225 fun gSwapPoll (name, doSwap, CELL{prio, value, ...}) =
226 let
227 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
228 val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
229 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
230 val () = S.atomicBegin()
231 val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
232 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
233 val msg =
234 case !value of
235 NONE =>
236 let
237 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
238 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
239 val msg = NONE
240 val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
241 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
242 val () = S.atomicEnd ()
243 val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
244 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
245 in
246 msg
247 end
248 | SOME x =>
249 let
250 val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
251 val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
252 val () = prio := 1
253 val () = doSwap value
254 val () = S.atomicEnd ()
255 val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
256 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
257 in
258 SOME x
259 end
260 val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
261 val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
262 in
263 msg
264 end
265
266
267 (** I-variables **)
268
269 val iVar = newCell
270 val sameIVar = sameCell
271
272 fun iPut (cell, x) = gPut ("iPut", cell, x)
273 local fun doGetSwap _ = ()
274 in
275 fun iGet cell = gSwap ("iGet", doGetSwap, cell)
276 fun iGetEvt cell = gSwapEvt ("iGetEvt", doGetSwap, cell)
277 fun iGetPoll cell = gSwapPoll ("iGetPoll", doGetSwap, cell)
278 end
279
280 (** M-variables **)
281
282 val mVar = newCell
283 fun mVarInit x = CELL {prio = ref 0, readQ = Q.new(), value = ref (SOME x)}
284 val sameMVar = sameCell
285
286 fun mPut (cell, x) = gPut ("mPut", cell, x)
287 local fun doTakeSwap value = value := NONE
288 in
289 fun mTake cell = gSwap ("mTake", doTakeSwap, cell)
290 fun mTakeEvt cell = gSwapEvt ("mTakeEvt", doTakeSwap, cell)
291 fun mTakePoll cell = gSwapPoll ("mTakePoll", doTakeSwap, cell)
292 end
293 local fun doGetSwap _ = ()
294 in
295 fun mGet cell = gSwap ("mGet", doGetSwap, cell)
296 fun mGetEvt cell = gSwapEvt ("mGetEvt", doGetSwap, cell)
297 fun mGetPoll cell = gSwapPoll ("mGetPoll", doGetSwap, cell)
298 end
299 local fun doSwapSwap x value = value := SOME x
300 in
301 fun mSwap (cell, x) = gSwap ("mSwap", doSwapSwap x, cell)
302 fun mSwapEvt (cell, x) = gSwapEvt ("mSwap", doSwapSwap x, cell)
303 end
304 end