Commit | Line | Data |
---|---|---|
7f918cf1 CE |
1 | (* sync-var.sml |
2 | * 2004 Matthew Fluet (mfluet@acm.org) | |
3 | * Ported to MLton threads. | |
4 | *) | |
5 | ||
6 | (* sync-var.sml | |
7 | * | |
8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | |
9 | * COPYRIGHT (c) 1989-1991 John H. Reppy | |
10 | * | |
11 | * The implementation of Id-style synchronizing memory cells. | |
12 | *) | |
13 | ||
14 | structure SyncVar : SYNC_VAR_EXTRA = | |
15 | struct | |
16 | structure Assert = LocalAssert(val assert = false) | |
17 | structure Debug = LocalDebug(val debug = false) | |
18 | ||
19 | structure Q = ImpQueue | |
20 | structure S = Scheduler | |
21 | structure E = Event | |
22 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) | |
23 | ||
24 | datatype trans_id = datatype TransID.trans_id | |
25 | datatype trans_id_state = datatype TransID.trans_id_state | |
26 | ||
27 | ||
28 | (* the underlying representation of both ivars and mvars is the same. *) | |
29 | datatype 'a cell = | |
30 | CELL of {prio : int ref, | |
31 | readQ : (trans_id * 'a S.thread) Q.t, | |
32 | value : 'a option ref} | |
33 | ||
34 | type 'a ivar = 'a cell | |
35 | type 'a mvar = 'a cell | |
36 | ||
37 | exception Put | |
38 | ||
39 | fun newCell () = CELL {prio = ref 0, readQ = Q.new(), value = ref NONE} | |
40 | ||
41 | (* sameCell : ('a cell * 'a cell) -> bool *) | |
42 | fun sameCell (CELL {prio = prio1, ...}, CELL {prio = prio2, ...}) = | |
43 | prio1 = prio2 | |
44 | ||
45 | (* bump a priority value by one, returning the old value *) | |
46 | fun bumpPriority (p as ref n) = (p := n+1; n) | |
47 | ||
48 | (* functions to clean channel input and output queues *) | |
49 | local | |
50 | fun cleaner (TXID txst, _) = | |
51 | case !txst of CANCEL => true | _ => false | |
52 | in | |
53 | fun cleanAndDeque q = | |
54 | Q.cleanAndDeque (q, cleaner) | |
55 | fun enqueAndClean (q, item) = | |
56 | Q.enqueAndClean (q, item, cleaner) | |
57 | end | |
58 | ||
59 | (* When a thread is resumed after being blocked on an iGet or mGet operation, | |
60 | * there may be other threads also blocked on the variable. This function | |
61 | * is used to propagate the message to all of the threads that are blocked | |
62 | * on the variable (or until one of them takes the value in the mvar case). | |
63 | * It must be called from an atomic region; when the readQ is finally empty, | |
64 | * we leave the atomic region. We must use "cleanAndDeque" to get items | |
65 | * from the readQ in the unlikely event that a single thread executes a | |
66 | * choice of multiple gets on the same variable. | |
67 | *) | |
68 | fun relayMsg (readQ, msg) = | |
69 | case (cleanAndDeque readQ) of | |
70 | NONE => S.atomicEnd() | |
71 | | SOME (txid, t) => | |
72 | S.readyAndSwitch | |
73 | (fn () => | |
74 | (TransID.force txid | |
75 | ; S.prepVal (t, msg))) | |
76 | ||
77 | (** G-variables **) | |
78 | (* Generalized synchronized variables, | |
79 | * to factor out the common operations. | |
80 | *) | |
81 | ||
82 | fun gPut (name, CELL {prio, readQ, value}, x) = | |
83 | let | |
84 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name]) | |
85 | val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *) | |
86 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"]) | |
87 | val () = S.atomicBegin() | |
88 | val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *) | |
89 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1) | |
90 | val () = | |
91 | case !value of | |
92 | NONE => | |
93 | let | |
94 | val () = debug (fn () => concat [name, "(3.1.1)"]) (* Atomic 1 *) | |
95 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.1.1)"], SOME 1) | |
96 | val () = value := SOME x | |
97 | val () = | |
98 | case cleanAndDeque readQ of | |
99 | NONE => S.atomicEnd () | |
100 | | SOME (rtxid, rt) => | |
101 | S.readyAndSwitch | |
102 | (fn () => | |
103 | (prio := 1 | |
104 | ; TransID.force rtxid | |
105 | ; S.prepVal (rt, x))) | |
106 | val () = debug (fn () => concat [name, "(3.1.2)"]) (* NonAtomic *) | |
107 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.1.2)"]) | |
108 | in | |
109 | () | |
110 | end | |
111 | | SOME _ => | |
112 | let | |
113 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
114 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
115 | val () = S.atomicEnd () | |
116 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *) | |
117 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"]) | |
118 | in | |
119 | raise Put | |
120 | end | |
121 | val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *) | |
122 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"]) | |
123 | in | |
124 | () | |
125 | end | |
126 | ||
127 | (* Swap the current contents of the cell with a new value; | |
128 | * it is guaranteed to be atomic. | |
129 | *) | |
130 | fun gSwap (name, doSwap, CELL {prio, readQ, value}) = | |
131 | let | |
132 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""]) | |
133 | val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *) | |
134 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"]) | |
135 | val () = S.atomicBegin() | |
136 | val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *) | |
137 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1) | |
138 | val msg = | |
139 | case !value of | |
140 | NONE => | |
141 | let | |
142 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
143 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
144 | val msg = | |
145 | S.atomicSwitchToNext | |
146 | (fn rt => enqueAndClean (readQ, (TransID.mkTxId (), rt))) | |
147 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *) | |
148 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1) | |
149 | val () = doSwap value | |
150 | val () = relayMsg (readQ, msg) | |
151 | val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *) | |
152 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"]) | |
153 | in | |
154 | msg | |
155 | end | |
156 | | SOME x => | |
157 | let | |
158 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
159 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
160 | val () = prio := 1 | |
161 | val () = doSwap value | |
162 | val () = S.atomicEnd () | |
163 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *) | |
164 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"]) | |
165 | in | |
166 | x | |
167 | end | |
168 | val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *) | |
169 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"]) | |
170 | in | |
171 | msg | |
172 | end | |
173 | ||
174 | fun gSwapEvt (name, doSwap, CELL{prio, readQ, value}) = | |
175 | let | |
176 | fun doitFn () = | |
177 | let | |
178 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".doitFn"], NONE) | |
179 | val x = valOf (!value) | |
180 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
181 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
182 | val () = prio := 1 | |
183 | val () = doSwap value | |
184 | val () = S.atomicEnd () | |
185 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *) | |
186 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"]) | |
187 | in | |
188 | x | |
189 | end | |
190 | fun blockFn {transId, cleanUp, next} = | |
191 | let | |
192 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".blockFn"], NONE) | |
193 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
194 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
195 | val msg = | |
196 | S.atomicSwitch | |
197 | (fn rt => | |
198 | (enqueAndClean (readQ, (transId, rt)) | |
199 | ; next ())) | |
200 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *) | |
201 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1) | |
202 | val () = cleanUp () | |
203 | val () = doSwap value | |
204 | val () = relayMsg (readQ, msg) | |
205 | val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *) | |
206 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"]) | |
207 | in | |
208 | msg | |
209 | end | |
210 | fun pollFn () = | |
211 | let | |
212 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".pollFn"], NONE) | |
213 | val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *) | |
214 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1) | |
215 | in | |
216 | case !value of | |
217 | NONE => E.blocked blockFn | |
218 | | SOME _ => E.enabled {prio = bumpPriority prio, | |
219 | doitFn = doitFn} | |
220 | end | |
221 | in | |
222 | E.bevt pollFn | |
223 | end | |
224 | ||
225 | fun gSwapPoll (name, doSwap, CELL{prio, value, ...}) = | |
226 | let | |
227 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""]) | |
228 | val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *) | |
229 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"]) | |
230 | val () = S.atomicBegin() | |
231 | val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *) | |
232 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1) | |
233 | val msg = | |
234 | case !value of | |
235 | NONE => | |
236 | let | |
237 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
238 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
239 | val msg = NONE | |
240 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *) | |
241 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1) | |
242 | val () = S.atomicEnd () | |
243 | val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *) | |
244 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"]) | |
245 | in | |
246 | msg | |
247 | end | |
248 | | SOME x => | |
249 | let | |
250 | val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *) | |
251 | val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1) | |
252 | val () = prio := 1 | |
253 | val () = doSwap value | |
254 | val () = S.atomicEnd () | |
255 | val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *) | |
256 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"]) | |
257 | in | |
258 | SOME x | |
259 | end | |
260 | val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *) | |
261 | val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"]) | |
262 | in | |
263 | msg | |
264 | end | |
265 | ||
266 | ||
267 | (** I-variables **) | |
268 | ||
269 | val iVar = newCell | |
270 | val sameIVar = sameCell | |
271 | ||
272 | fun iPut (cell, x) = gPut ("iPut", cell, x) | |
273 | local fun doGetSwap _ = () | |
274 | in | |
275 | fun iGet cell = gSwap ("iGet", doGetSwap, cell) | |
276 | fun iGetEvt cell = gSwapEvt ("iGetEvt", doGetSwap, cell) | |
277 | fun iGetPoll cell = gSwapPoll ("iGetPoll", doGetSwap, cell) | |
278 | end | |
279 | ||
280 | (** M-variables **) | |
281 | ||
282 | val mVar = newCell | |
283 | fun mVarInit x = CELL {prio = ref 0, readQ = Q.new(), value = ref (SOME x)} | |
284 | val sameMVar = sameCell | |
285 | ||
286 | fun mPut (cell, x) = gPut ("mPut", cell, x) | |
287 | local fun doTakeSwap value = value := NONE | |
288 | in | |
289 | fun mTake cell = gSwap ("mTake", doTakeSwap, cell) | |
290 | fun mTakeEvt cell = gSwapEvt ("mTakeEvt", doTakeSwap, cell) | |
291 | fun mTakePoll cell = gSwapPoll ("mTakePoll", doTakeSwap, cell) | |
292 | end | |
293 | local fun doGetSwap _ = () | |
294 | in | |
295 | fun mGet cell = gSwap ("mGet", doGetSwap, cell) | |
296 | fun mGetEvt cell = gSwapEvt ("mGetEvt", doGetSwap, cell) | |
297 | fun mGetPoll cell = gSwapPoll ("mGetPoll", doGetSwap, cell) | |
298 | end | |
299 | local fun doSwapSwap x value = value := SOME x | |
300 | in | |
301 | fun mSwap (cell, x) = gSwap ("mSwap", doSwapSwap x, cell) | |
302 | fun mSwapEvt (cell, x) = gSwapEvt ("mSwap", doSwapSwap x, cell) | |
303 | end | |
304 | end |