Commit | Line | Data |
---|---|---|
7f918cf1 CE |
1 | (* event.sml |
2 | * 2004 Matthew Fluet (mfluet@acm.org) | |
3 | * Ported to MLton threads. | |
4 | *) | |
5 | ||
6 | (* event.sml | |
7 | * | |
8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | |
9 | * COPYRIGHT (c) 1989-1991 John H. Reppy | |
10 | * | |
11 | * The representation of event values and the event combinators. | |
12 | * | |
13 | * Some important requirements on the implementation of base event values: | |
14 | * | |
15 | * 1) The pollFn, doitFn, and blockFn are always called from inside | |
16 | * atomic regions. | |
17 | * | |
18 | * 2) The pollFn returns an integer priority: this is 0 when not enabled, | |
19 | * ~1 for fixed priority, and a positive value for dynamic priority. | |
20 | * The standard scheme is to associate a counter with the underlying | |
21 | * synchronization object, and to increase it by one for each | |
22 | * synchronization attempt. | |
23 | * | |
24 | * 3) The blockFn is responsible for exiting the atomic region; the doitFns | |
25 | * should NOT leave the atomic region. | |
26 | * | |
27 | * 4) The blockFn is responsible for executing the "cleanUp" action | |
28 | * prior to leaving the atomic region. | |
29 | *) | |
30 | ||
31 | structure Event : EVENT_EXTRA = | |
32 | struct | |
33 | structure Assert = LocalAssert(val assert = false) | |
34 | structure Debug = LocalDebug(val debug = false) | |
35 | ||
36 | structure S = Scheduler | |
37 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) | |
38 | fun debug' msg = debug (fn () => msg) | |
39 | ||
40 | datatype trans_id = datatype TransID.trans_id | |
41 | datatype trans_id_state = datatype TransID.trans_id_state | |
42 | datatype cvar = datatype CVar.cvar | |
43 | datatype cvar_state = datatype CVar.cvar_state | |
44 | ||
45 | ||
46 | datatype status = datatype RepTypes.status | |
47 | val enabled = ENABLED | |
48 | val blocked = BLOCKED | |
49 | ||
50 | type 'a base = 'a RepTypes.base | |
51 | ||
52 | datatype event = datatype RepTypes.event | |
53 | val bevt = fn pollFn => BEVT [pollFn] | |
54 | ||
55 | datatype 'a group = | |
56 | BASE of 'a base list | |
57 | | GRP of 'a group list | |
58 | | NACK of cvar * 'a group | |
59 | ||
60 | ||
61 | (** Condition variables. Because these variables are set inside atomic | |
62 | ** regions, we have to use different conventions for clean-up, etc. | |
63 | ** Instead of requiring the blockFn continuation to call the cleanUp | |
64 | ** action and to leave the atomic region, we call the cleanUp function | |
65 | ** when setting the condition variable (in atomicCVarSet), and have the | |
66 | ** invariant that the blockFn continuation is dispatched outside the | |
67 | ** atomic region. | |
68 | **) | |
69 | ||
70 | (* set a condition variable; | |
71 | * we assume that this function is always executed in an atomic region. | |
72 | *) | |
73 | fun atomicCVarSet (CVAR state) : unit = | |
74 | let | |
75 | val () = Assert.assertAtomic' ("Event.atomicCVarSet", NONE) | |
76 | val () = debug' "atomicCVarSet" (* Atomic 1 *) | |
77 | val () = Assert.assertAtomic' ("Event.atomicCVarSet", SOME 1) | |
78 | in | |
79 | case !state of | |
80 | CVAR_unset waiting => | |
81 | let | |
82 | fun add waiting : unit = | |
83 | case waiting of | |
84 | [] => () | |
85 | | ({transId = TXID txst, cleanUp: unit -> unit, thread}::waiting) => | |
86 | (case !txst of | |
87 | CANCEL => () | |
88 | | TRANS => | |
89 | (txst := CANCEL | |
90 | ; cleanUp () | |
91 | ; S.ready thread) | |
92 | ; add waiting) | |
93 | in | |
94 | state := CVAR_set 1 | |
95 | ; add waiting | |
96 | end | |
97 | | _ => raise Fail "atomicCVarSet" | |
98 | end | |
99 | ||
100 | (* the event constructor for waiting on a cvar. | |
101 | *) | |
102 | fun cvarGetEvt (CVAR state) : unit event = | |
103 | let | |
104 | fun doitFn () = | |
105 | let | |
106 | val () = Assert.assertAtomic' ("Event.cvarGetEvt.doitFn", NONE) | |
107 | val () = debug' "cvarGetEvt(3.1.1)" (* Atomic 1 *) | |
108 | val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.1.1)", SOME 1) | |
109 | val () = state := CVAR_set 1 | |
110 | val () = S.atomicEnd () | |
111 | val () = debug' "cvarGetEvt(3.1.2)" (* NonAtomic *) | |
112 | val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.1.2)" | |
113 | in | |
114 | () | |
115 | end | |
116 | fun blockFn {transId, cleanUp, next} = | |
117 | let | |
118 | val () = Assert.assertAtomic' ("Event.cvarGetEvt.blockFn", NONE) | |
119 | val () = debug' "cvarGetEvt(3.2.1)" (* Atomic 1 *) | |
120 | val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.2.1)", SOME 1) | |
121 | val () = | |
122 | S.atomicSwitch | |
123 | (fn t => | |
124 | let | |
125 | val item = {transId = transId, | |
126 | cleanUp = cleanUp, | |
127 | thread = S.prep t} | |
128 | val waiting = | |
129 | case !state of | |
130 | CVAR_unset waiting => waiting | |
131 | | _ => raise Fail "cvarGetEvt:blockFn" | |
132 | in | |
133 | state := CVAR_unset (item::waiting) | |
134 | ; next () | |
135 | end) | |
136 | val () = debug' "cvarGetEvt(3.2.2)" (* NonAtomic *) | |
137 | val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.2.2)" | |
138 | in | |
139 | () | |
140 | end | |
141 | fun pollFn () = | |
142 | let | |
143 | val () = Assert.assertAtomic' ("Event.cvarGetEvt.pollFn", NONE) | |
144 | val () = debug' "cvarGetEvt(2)" (* Atomic 1 *) | |
145 | val () = Assert.assertAtomic' ("Event.cvarGetEvt(2)", SOME 1) | |
146 | in | |
147 | case !state of | |
148 | CVAR_set n => | |
149 | (state := CVAR_set (n + 1) | |
150 | ; enabled {prio = n, doitFn = doitFn}) | |
151 | | _ => blocked blockFn | |
152 | end | |
153 | in | |
154 | bevt pollFn | |
155 | end | |
156 | ||
157 | ||
158 | (* event combinators *) | |
159 | val never : 'a event = | |
160 | BEVT [] | |
161 | fun alwaysEvt (v : 'a) : 'a event = | |
162 | let | |
163 | fun doitFn () = | |
164 | let | |
165 | val () = Assert.assertAtomic' ("Event.alwaysEvt.doitFn", NONE) | |
166 | val () = debug' "alwaysEvt(3.1)" (* Atomic 1 *) | |
167 | val () = Assert.assertAtomic' ("Event.alwaysEvt(3.1)", SOME 1) | |
168 | val () = S.atomicEnd () | |
169 | val () = debug' "alwaysEvt(3.2)" (* NonAtomic *) | |
170 | val () = Assert.assertNonAtomic' "Event.alwaysEvt(3.2)" | |
171 | in | |
172 | v | |
173 | end | |
174 | fun pollFn () = | |
175 | let | |
176 | val () = Assert.assertAtomic' ("Event.alwaysEvt.pollFn", NONE) | |
177 | val () = debug' "alwaysEvt(2)" (* Atomic 1 *) | |
178 | val () = Assert.assertAtomic' ("Event.alwaysEvt(2)", SOME 1) | |
179 | in | |
180 | enabled {prio = ~1, doitFn = doitFn} | |
181 | end | |
182 | in | |
183 | bevt pollFn | |
184 | end | |
185 | ||
186 | fun wrap (evt : 'a event, wfn : 'a -> 'b) : 'b event = | |
187 | let | |
188 | fun wrapF f x = wfn (f x) | |
189 | fun wrapBaseEvt pollFn () = | |
190 | case pollFn () of | |
191 | ENABLED {prio, doitFn} => | |
192 | ENABLED {prio = prio, doitFn = wrapF doitFn} | |
193 | | BLOCKED blockFn => | |
194 | BLOCKED (wrapF blockFn) | |
195 | fun wrap' evt = | |
196 | case evt of | |
197 | BEVT bevts => | |
198 | BEVT(List.map wrapBaseEvt bevts) | |
199 | | CHOOSE evts => | |
200 | CHOOSE(List.map wrap' evts) | |
201 | | GUARD g => | |
202 | GUARD(fn () => wrap (g (), wfn)) | |
203 | | WNACK f => | |
204 | WNACK(fn evt => wrap (f evt, wfn)) | |
205 | in | |
206 | wrap' evt | |
207 | end | |
208 | fun wrapHandler (evt : 'a event, hfn : exn -> 'a) : 'a event = | |
209 | let | |
210 | fun wrapF f x = (f x) handle exn => hfn exn | |
211 | fun wrapBaseEvt pollFn () = | |
212 | case pollFn () of | |
213 | ENABLED {prio, doitFn} => | |
214 | ENABLED {prio = prio, doitFn = wrapF doitFn} | |
215 | | BLOCKED blockFn => | |
216 | BLOCKED (wrapF blockFn) | |
217 | fun wrap' evt = | |
218 | case evt of | |
219 | BEVT bevts => | |
220 | BEVT(List.map wrapBaseEvt bevts) | |
221 | | CHOOSE evts => | |
222 | CHOOSE(List.map wrap' evts) | |
223 | | GUARD g => | |
224 | GUARD(fn () => wrapHandler (g (), hfn)) | |
225 | | WNACK f => | |
226 | WNACK(fn evt => wrapHandler (f evt, hfn)) | |
227 | in | |
228 | wrap' evt | |
229 | end | |
230 | ||
231 | val guard = GUARD | |
232 | val withNack = WNACK | |
233 | ||
234 | fun choose (evts : 'a event list) : 'a event = | |
235 | let | |
236 | val () = Assert.assertNonAtomic' "Event.choose" | |
237 | val () = debug' "choose(1)" (* NonAtomic *) | |
238 | val () = Assert.assertNonAtomic' "Event.choose(1)" | |
239 | fun gatherBEvts (evts, bevts') = | |
240 | case (evts, bevts') of | |
241 | ([], bevts') => BEVT bevts' | |
242 | | ((BEVT bevts)::evts, bevts') => gatherBEvts (evts, bevts @ bevts') | |
243 | | (evts, []) => gather (evts, []) | |
244 | | (evts, bevts') => gather (evts, [BEVT bevts']) | |
245 | and gather (evts, evts') = | |
246 | case (evts, evts') of | |
247 | ([], [evt']) => evt' | |
248 | | ([], evts') => CHOOSE evts' | |
249 | | ((CHOOSE cevts)::evts, evts') => | |
250 | gather (evts, cevts @ evts') | |
251 | | ((BEVT [])::evts, evts') => | |
252 | gather (evts, evts') | |
253 | | ((BEVT bevts)::evts, (BEVT bevts')::evts') => | |
254 | gather (evts, BEVT (bevts @ bevts')::evts') | |
255 | | (evt::evts, evts') => | |
256 | gather (evts, evt::evts') | |
257 | val evt = gatherBEvts (List.rev evts, []) | |
258 | in | |
259 | evt | |
260 | end | |
261 | ||
262 | ||
263 | local | |
264 | val cnt = ref 0 | |
265 | fun random i = | |
266 | let val j = !cnt | |
267 | in | |
268 | if j = 1000000 then cnt := 0 else cnt := j + 1 | |
269 | ; Int.rem (j, i) | |
270 | end | |
271 | in | |
272 | fun selectDoitFn (doitFns : {prio : int, doitFn : 'a} list) : 'a = | |
273 | let | |
274 | val () = Assert.assertAtomic' ("Event.selectDoitFn", NONE) | |
275 | val () = debug' "selectDoitFn(2)" (* Atomic 1 *) | |
276 | val () = Assert.assertAtomic' ("Event.selectDoitFn(2)", SOME 1) | |
277 | in | |
278 | case doitFns of | |
279 | [{doitFn, ...}] => doitFn | |
280 | | doitFns => | |
281 | let | |
282 | fun select (doitFns, maxP, | |
283 | doitFnsMaxP, numMaxP, | |
284 | doitFnsFixed, numFixed) = | |
285 | case doitFns of | |
286 | [] => (case (doitFnsMaxP, doitFnsFixed) of | |
287 | ([doitFn], []) => doitFn | |
288 | | ([], [doitFn]) => doitFn | |
289 | | (doitFnsMaxP, doitFnsFixed) => | |
290 | let | |
291 | val bias = 2 | |
292 | val num = numFixed + bias * numMaxP | |
293 | val k = random num | |
294 | in | |
295 | if k < numFixed | |
296 | then List.nth (doitFnsFixed, k) | |
297 | else List.nth (doitFnsMaxP, | |
298 | Int.mod(k - numFixed, numMaxP)) | |
299 | end) | |
300 | | {prio, doitFn}::doitFns => | |
301 | if prio = ~1 | |
302 | then select(doitFns, maxP, | |
303 | doitFnsMaxP, numMaxP, | |
304 | doitFn::doitFnsFixed, numFixed + 1) | |
305 | else if prio > maxP | |
306 | then select(doitFns, prio, | |
307 | [doitFn], 1, | |
308 | doitFnsFixed, numFixed) | |
309 | else if prio = maxP | |
310 | then select(doitFns, maxP, | |
311 | doitFn::doitFnsMaxP, numMaxP + 1, | |
312 | doitFnsFixed, numFixed) | |
313 | else select(doitFns, maxP, | |
314 | doitFnsMaxP, numMaxP, | |
315 | doitFnsFixed, numFixed) | |
316 | in | |
317 | select (doitFns, 0, [], 0, [], 0) | |
318 | end | |
319 | end | |
320 | end | |
321 | ||
322 | fun syncOnBEvt (pollFn : 'a base) : 'a = | |
323 | let | |
324 | val () = Assert.assertNonAtomic' "Event.syncOnBEvt" | |
325 | val () = debug' "syncOnBEvt(1)" (* NonAtomic *) | |
326 | val () = Assert.assertNonAtomic' "Event.syncOnBEvt(1)" | |
327 | val () = S.atomicBegin () | |
328 | val () = debug' "syncOnBEvt(2)" (* Atomic 1 *) | |
329 | val () = Assert.assertAtomic' ("Event.syncOnBEvt(2)", SOME 1) | |
330 | val x = | |
331 | case pollFn () of | |
332 | ENABLED {doitFn, ...} => doitFn () | |
333 | | BLOCKED blockFn => | |
334 | let val (transId, cleanUp) = TransID.mkFlg () | |
335 | in blockFn {transId = transId, | |
336 | cleanUp = cleanUp, | |
337 | next = S.next} | |
338 | end | |
339 | val () = debug' "syncOnBEvt(4)" (* NonAtomic *) | |
340 | val () = Assert.assertNonAtomic' "Event.syncOnBEvt(4)" | |
341 | in | |
342 | x | |
343 | end | |
344 | ||
345 | (* this function handles the case of synchronizing on a list of | |
346 | * base events (w/o any negative acknowledgements). It also handles | |
347 | * the case of syncrhonizing on NEVER. | |
348 | *) | |
349 | fun syncOnBEvts (bevts : 'a base list) : 'a = | |
350 | let | |
351 | val () = Assert.assertNonAtomic' "Event.syncOnBEvts" | |
352 | val () = debug' "syncOnBEvts(1)" (* NonAtomic *) | |
353 | val () = Assert.assertNonAtomic' "Event.syncOnBEvts(1)" | |
354 | fun ext (bevts, blockFns) : 'a = | |
355 | let | |
356 | val () = debug' "syncOnBEvts(2).ext" (* Atomic 1 *) | |
357 | val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext", SOME 1) | |
358 | in | |
359 | case bevts of | |
360 | [] => | |
361 | let | |
362 | val () = debug' "syncOnBEvts(2).ext([])" (* Atomic 1 *) | |
363 | val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([])", SOME 1) | |
364 | in | |
365 | S.atomicSwitch | |
366 | (fn (t : 'a S.thread) => | |
367 | let | |
368 | val (transId, cleanUp) = TransID.mkFlg () | |
369 | fun log blockFns : S.rdy_thread = | |
370 | let | |
371 | val () = debug' "syncOnBEvts(2).ext([]).log" (* Atomic 1 *) | |
372 | val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([]).log", SOME 1) | |
373 | in | |
374 | case blockFns of | |
375 | [] => S.next () | |
376 | | blockFn::blockFns => | |
377 | (S.prep o S.new) | |
378 | (fn _ => fn () => | |
379 | let | |
380 | val () = S.atomicBegin () | |
381 | val x = blockFn {transId = transId, | |
382 | cleanUp = cleanUp, | |
383 | next = fn () => log blockFns} | |
384 | in S.switch(fn _ => S.prepVal (t, x)) | |
385 | end) | |
386 | end | |
387 | in | |
388 | log blockFns | |
389 | end) | |
390 | end | |
391 | | pollFn::bevts => | |
392 | (case pollFn () of | |
393 | ENABLED doitFn => extRdy (bevts, [doitFn]) | |
394 | | BLOCKED blockFn => ext (bevts, blockFn::blockFns)) | |
395 | end | |
396 | and extRdy (bevts, doitFns) : 'a = | |
397 | let | |
398 | val () = debug' "syncOnBEvts(2).extRdy" (* Atomic 1*) | |
399 | val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).extRdy", SOME 1) | |
400 | in | |
401 | case bevts of | |
402 | [] => | |
403 | let val doitFn = selectDoitFn doitFns | |
404 | in doitFn () | |
405 | end | |
406 | | pollFn::bevts => | |
407 | (case pollFn () of | |
408 | ENABLED doitFn => extRdy (bevts, doitFn::doitFns) | |
409 | | _ => extRdy (bevts, doitFns)) | |
410 | end | |
411 | val x = | |
412 | case bevts of | |
413 | [] => S.switchToNext (fn _ => ()) | |
414 | | [bevt] => syncOnBEvt bevt | |
415 | | bevts => (S.atomicBegin (); ext (bevts, [])) | |
416 | val () = debug' "syncOnBEvts(4)" (* NonAtomic *) | |
417 | val () = Assert.assertNonAtomic' "Event.syncOnBEvts(4)" | |
418 | in | |
419 | x | |
420 | end | |
421 | ||
422 | (* walk the event group tree, collecting the base events (with associated | |
423 | * ack flags), and a list of flag sets. A flag set is a (cvar * ack flag list) | |
424 | * pair, where the flags are those associated with the events covered by the | |
425 | * nack cvar. | |
426 | *) | |
427 | type ack_flg = bool ref | |
428 | type ack_flgs = ack_flg list | |
429 | type 'a back = 'a base * ack_flg | |
430 | type 'a backs = 'a back list | |
431 | type flg_set = cvar * ack_flg list | |
432 | type flg_sets = flg_set list | |
433 | fun collect (gevt : 'a group) : 'a backs * flg_sets = | |
434 | let | |
435 | fun gatherWrapped (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) : | |
436 | 'a backs * flg_sets = | |
437 | let | |
438 | fun gather (gevt : 'a group, backs : 'a backs, | |
439 | ackFlgs : ack_flgs, flgSets : flg_sets) : | |
440 | 'a backs * ack_flgs * flg_sets = | |
441 | case gevt of | |
442 | BASE bevts => | |
443 | let | |
444 | fun append (bevts, backs, ackFlgs) = | |
445 | case bevts of | |
446 | [] => (backs, ackFlgs) | |
447 | | bevt::bevts => | |
448 | let val ackFlg = ref false | |
449 | in append (bevts, (bevt, ackFlg)::backs, ackFlg::ackFlgs) | |
450 | end | |
451 | val (backs', ackFlgs') = append (bevts, backs, ackFlgs) | |
452 | in | |
453 | (backs', ackFlgs', flgSets) | |
454 | end | |
455 | | GRP gevt => | |
456 | let | |
457 | fun f (gevt', (backs', ackFlgs', flgSets')) = | |
458 | gather (gevt', backs', ackFlgs', flgSets') | |
459 | in List.foldl f (backs, ackFlgs, flgSets) gevt | |
460 | end | |
461 | | NACK (cvar, gevt) => | |
462 | let | |
463 | val (backs', ackFlgs', flgSets') = | |
464 | gather (gevt, backs, [], flgSets) | |
465 | in | |
466 | (backs', ackFlgs' @ ackFlgs, (cvar, ackFlgs')::flgSets') | |
467 | end | |
468 | val (backs, _, flgSets) = gather (gevt, backs, [], flgSets) | |
469 | in | |
470 | (backs, flgSets) | |
471 | end | |
472 | in | |
473 | case gevt of | |
474 | GRP _ => | |
475 | let | |
476 | val ackFlg = ref false | |
477 | fun gather (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) : | |
478 | 'a backs * flg_sets = | |
479 | case gevt of | |
480 | BASE bevts => | |
481 | let | |
482 | fun append (bevts, backs) = | |
483 | case bevts of | |
484 | [] => backs | |
485 | | bevt::bevts => append (bevts, (bevt, ackFlg)::backs) | |
486 | in | |
487 | (append (bevts, backs), flgSets) | |
488 | end | |
489 | | GRP gevt => | |
490 | let | |
491 | fun f (gevt', (backs', flgSets')) = | |
492 | gather(gevt', backs', flgSets') | |
493 | in List.foldl f (backs, flgSets) gevt | |
494 | end | |
495 | | NACK _ => | |
496 | gatherWrapped (gevt, backs, flgSets) | |
497 | in | |
498 | gather (gevt, [], []) | |
499 | end | |
500 | | gevt => gatherWrapped (gevt, [], []) | |
501 | end | |
502 | ||
503 | (* this function handles the more complicated case of synchronization | |
504 | * on groups of events where negative acknowledgements are involved. | |
505 | *) | |
506 | fun syncOnGrp (gevt : 'a group) : 'a = | |
507 | let | |
508 | val () = Assert.assertNonAtomic' "Event.syncOnGrp" | |
509 | val () = debug' "syncOnGrp(1)" (* NonAtomic *) | |
510 | val () = Assert.assertNonAtomic' "Event.syncOnGrp(1)" | |
511 | val (backs, flgSets) = collect gevt | |
512 | fun chkCVars () = | |
513 | let | |
514 | val () = debug' "syncOnGrp.chkCVars" (* Atomic 1 *) | |
515 | val () = Assert.assertAtomic' ("Event.syncOnGrp.chkCVars", SOME 1) | |
516 | (* chkCVar checks the flags of a flag set. | |
517 | * If they are all false, then the corresponding cvar | |
518 | * is set to signal the negative ack. | |
519 | *) | |
520 | fun chkCVar (cvar, flgs) = | |
521 | if List.exists ! flgs | |
522 | then () | |
523 | else atomicCVarSet cvar | |
524 | in | |
525 | List.app chkCVar flgSets | |
526 | end | |
527 | fun ext (backs, blockFns) : 'a = | |
528 | let | |
529 | val () = debug' "syncOnGrp(2).ext" (* Atomic 1 *) | |
530 | val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext", SOME 1) | |
531 | in | |
532 | case backs of | |
533 | [] => | |
534 | let | |
535 | val () = debug' "syncOnGrp(2).ext([])" (* Atomic 1 *) | |
536 | val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([])", SOME 1) | |
537 | in | |
538 | S.atomicSwitch | |
539 | (fn (t : 'a S.thread) => | |
540 | let | |
541 | val (transId, cleanUp) = TransID.mkFlg () | |
542 | val cleanUp = fn flg => fn () => | |
543 | (cleanUp () | |
544 | ; flg := true | |
545 | ; chkCVars ()) | |
546 | fun log blockFns : S.rdy_thread = | |
547 | let | |
548 | val () = debug' "syncOnGrp(2).ext([]).log" (* Atomic 1 *) | |
549 | val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([]).log", SOME 1) | |
550 | in | |
551 | case blockFns of | |
552 | [] => S.next () | |
553 | | (blockFn,ackFlg)::blockFns => | |
554 | (S.prep o S.new) | |
555 | (fn _ => fn () => | |
556 | let | |
557 | val () = S.atomicBegin () | |
558 | val x = blockFn {transId = transId, | |
559 | cleanUp = cleanUp ackFlg, | |
560 | next = fn () => log blockFns} | |
561 | in S.switch(fn _ => S.prepVal (t, x)) | |
562 | end) | |
563 | end | |
564 | in | |
565 | log blockFns | |
566 | end) | |
567 | end | |
568 | | (pollFn,ackFlg)::backs => | |
569 | (case pollFn () of | |
570 | ENABLED {prio, doitFn} => | |
571 | extRdy (backs, [{prio = prio,doitFn = (doitFn, ackFlg)}]) | |
572 | | BLOCKED blockFn => ext (backs, (blockFn,ackFlg)::blockFns)) | |
573 | end | |
574 | and extRdy (backs, doitFns) : 'a = | |
575 | let | |
576 | val () = debug' "syncOnGrp.extRdy(2)" (* Atomic 1*) | |
577 | val () = Assert.assertAtomic' ("Event.syncOnGrp.extRdy(2)", SOME 1) | |
578 | in | |
579 | case backs of | |
580 | [] => let | |
581 | val (doitFn, flg) = selectDoitFn doitFns | |
582 | in | |
583 | flg := true | |
584 | ; chkCVars () | |
585 | ; doitFn () | |
586 | end | |
587 | | (pollFn,ackFlg)::backs => | |
588 | (case pollFn () of | |
589 | ENABLED {prio, doitFn} => | |
590 | extRdy (backs, {prio = prio, doitFn = (doitFn, ackFlg)}::doitFns) | |
591 | | _ => extRdy (backs, doitFns)) | |
592 | end | |
593 | val x = (S.atomicBegin (); ext (backs, [])) | |
594 | val () = debug' "syncOnGrp(4)" (* NonAtomic *) | |
595 | val () = Assert.assertNonAtomic' "Event.syncOnGrp(4)" | |
596 | in | |
597 | x | |
598 | end | |
599 | ||
600 | local | |
601 | (* force the evaluation of any guards in an event collection, | |
602 | * returning an event group. | |
603 | *) | |
604 | fun forceBL (evts : 'a event list, bevts : 'a base list) : 'a group = | |
605 | case evts of | |
606 | [] => BASE bevts | |
607 | | evt::evts => | |
608 | (case force evt of | |
609 | BASE bevts' => forceBL (evts, bevts' @ bevts) | |
610 | | GRP gevts => forceGL (evts, if List.null bevts then gevts else gevts @ [BASE bevts]) | |
611 | | gevt => forceGL (evts, if List.null bevts then [gevt] else [gevt, BASE bevts])) | |
612 | and forceGL (evts : 'a event list, gevts : 'a group list) : 'a group = | |
613 | case (evts, gevts) of | |
614 | ([], [gevt]) => gevt | |
615 | | ([], gevts) => GRP gevts | |
616 | | (evt::evts, gevts) => | |
617 | (case (force evt, gevts) of | |
618 | (BASE [], gevts) => | |
619 | forceGL (evts, gevts) | |
620 | | (BASE bevts', (BASE bevts)::gevts) => | |
621 | forceGL (evts, BASE (bevts' @ bevts)::gevts) | |
622 | | (GRP gevts', gevts) => | |
623 | forceGL (evts, gevts' @ gevts) | |
624 | | (gevt, gevts) => | |
625 | forceGL (evts, gevt::gevts)) | |
626 | and force (evt : 'a event) : 'a group = | |
627 | let | |
628 | val gevt = | |
629 | case evt of | |
630 | BEVT bevts => BASE bevts | |
631 | | CHOOSE evts => forceBL (evts, []) | |
632 | | GUARD g => force (g ()) | |
633 | | WNACK f => | |
634 | let val cvar = CVar.new () | |
635 | in NACK(cvar, force (f (cvarGetEvt cvar))) | |
636 | end | |
637 | in | |
638 | gevt | |
639 | end | |
640 | in | |
641 | fun sync evt = | |
642 | let | |
643 | val () = Assert.assertNonAtomic' "Event.sync" | |
644 | val () = debug' "sync(1)" (* NonAtomic *) | |
645 | val () = Assert.assertNonAtomic' "Event.sync(1)" | |
646 | val x = | |
647 | case force evt of | |
648 | BASE bevts => syncOnBEvts bevts | |
649 | | gevt => syncOnGrp gevt | |
650 | val () = debug' "sync(4)" (* NonAtomic *) | |
651 | val () = Assert.assertNonAtomic' "Event.sync(4)" | |
652 | in | |
653 | x | |
654 | end | |
655 | fun select (evts : 'a event list) : 'a = | |
656 | let | |
657 | val () = Assert.assertNonAtomic' "Event.select" | |
658 | val () = debug' "select(1)" (* NonAtomic *) | |
659 | val () = Assert.assertNonAtomic' "Event.select(1)" | |
660 | val x = | |
661 | case forceBL (evts, []) of | |
662 | BASE bevts => syncOnBEvts bevts | |
663 | | gevt => syncOnGrp gevt | |
664 | val () = debug' "select(4)" (* NonAtomic *) | |
665 | val () = Assert.assertNonAtomic' "Event.select(4)" | |
666 | in | |
667 | x | |
668 | end | |
669 | end | |
670 | end |