2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1995 AT
&T Bell Laboratories
.
9 * COPYRIGHT (c
) 1989-1991 John H
. Reppy
11 * The representation
of event values
and the event combinators
.
13 * Some important requirements on the implementation
of base event values
:
15 * 1) The pollFn
, doitFn
, and blockFn are always called from inside
18 * 2) The pollFn returns an integer priority
: this is
0 when not enabled
,
19 * ~
1 for fixed priority
, and a positive value for dynamic priority
.
20 * The standard scheme is to associate a counter
with the underlying
21 * synchronization object
, and to increase it by one for each
22 * synchronization attempt
.
24 * 3) The blockFn is responsible for exiting the atomic region
; the doitFns
25 * should NOT leave the atomic region
.
27 * 4) The blockFn is responsible for executing the
"cleanUp" action
28 * prior to leaving the atomic region
.
31 structure Event
: EVENT_EXTRA
=
33 structure Assert
= LocalAssert(val assert
= false)
34 structure Debug
= LocalDebug(val debug
= false)
36 structure S
= Scheduler
37 fun debug msg
= Debug
.sayDebug ([S
.atomicMsg
, S
.tidMsg
], msg
)
38 fun debug
' msg
= debug (fn () => msg
)
40 datatype trans_id
= datatype TransID
.trans_id
41 datatype trans_id_state
= datatype TransID
.trans_id_state
42 datatype cvar
= datatype CVar
.cvar
43 datatype cvar_state
= datatype CVar
.cvar_state
46 datatype status
= datatype RepTypes
.status
50 type 'a base
= 'a RepTypes
.base
52 datatype event
= datatype RepTypes
.event
53 val bevt
= fn pollFn
=> BEVT
[pollFn
]
57 | GRP
of 'a group list
58 | NACK
of cvar
* 'a group
61 (** Condition variables
. Because these variables are set inside atomic
62 ** regions
, we have to use different conventions for clean
-up
, etc
.
63 ** Instead
of requiring the blockFn continuation to call the cleanUp
64 ** action
and to leave the atomic region
, we call the cleanUp function
65 ** when setting the condition
variable (in atomicCVarSet
), and have the
66 ** invariant that the blockFn continuation is dispatched outside the
70 (* set a condition variable
;
71 * we assume that this function is always executed
in an atomic region
.
73 fun atomicCVarSet (CVAR state
) : unit
=
75 val () = Assert
.assertAtomic
' ("Event.atomicCVarSet", NONE
)
76 val () = debug
' "atomicCVarSet" (* Atomic
1 *)
77 val () = Assert
.assertAtomic
' ("Event.atomicCVarSet", SOME
1)
82 fun add waiting
: unit
=
85 |
({transId
= TXID txst
, cleanUp
: unit
-> unit
, thread
}::waiting
) =>
97 | _
=> raise Fail
"atomicCVarSet"
100 (* the event constructor for waiting on a cvar
.
102 fun cvarGetEvt (CVAR state
) : unit event
=
106 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt.doitFn", NONE
)
107 val () = debug
' "cvarGetEvt(3.1.1)" (* Atomic
1 *)
108 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt(3.1.1)", SOME
1)
109 val () = state
:= CVAR_set
1
110 val () = S
.atomicEnd ()
111 val () = debug
' "cvarGetEvt(3.1.2)" (* NonAtomic
*)
112 val () = Assert
.assertNonAtomic
' "Event.cvarGetEvt(3.1.2)"
116 fun blockFn
{transId
, cleanUp
, next
} =
118 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt.blockFn", NONE
)
119 val () = debug
' "cvarGetEvt(3.2.1)" (* Atomic
1 *)
120 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt(3.2.1)", SOME
1)
125 val item
= {transId
= transId
,
130 CVAR_unset waiting
=> waiting
131 | _
=> raise Fail
"cvarGetEvt:blockFn"
133 state
:= CVAR_unset (item
::waiting
)
136 val () = debug
' "cvarGetEvt(3.2.2)" (* NonAtomic
*)
137 val () = Assert
.assertNonAtomic
' "Event.cvarGetEvt(3.2.2)"
143 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt.pollFn", NONE
)
144 val () = debug
' "cvarGetEvt(2)" (* Atomic
1 *)
145 val () = Assert
.assertAtomic
' ("Event.cvarGetEvt(2)", SOME
1)
149 (state
:= CVAR_set (n
+ 1)
150 ; enabled
{prio
= n
, doitFn
= doitFn
})
151 | _
=> blocked blockFn
158 (* event combinators
*)
159 val never
: 'a event
=
161 fun alwaysEvt (v
: 'a
) : 'a event
=
165 val () = Assert
.assertAtomic
' ("Event.alwaysEvt.doitFn", NONE
)
166 val () = debug
' "alwaysEvt(3.1)" (* Atomic
1 *)
167 val () = Assert
.assertAtomic
' ("Event.alwaysEvt(3.1)", SOME
1)
168 val () = S
.atomicEnd ()
169 val () = debug
' "alwaysEvt(3.2)" (* NonAtomic
*)
170 val () = Assert
.assertNonAtomic
' "Event.alwaysEvt(3.2)"
176 val () = Assert
.assertAtomic
' ("Event.alwaysEvt.pollFn", NONE
)
177 val () = debug
' "alwaysEvt(2)" (* Atomic
1 *)
178 val () = Assert
.assertAtomic
' ("Event.alwaysEvt(2)", SOME
1)
180 enabled
{prio
= ~
1, doitFn
= doitFn
}
186 fun wrap (evt
: 'a event
, wfn
: 'a
-> 'b
) : 'b event
=
188 fun wrapF f x
= wfn (f x
)
189 fun wrapBaseEvt
pollFn () =
191 ENABLED
{prio
, doitFn
} =>
192 ENABLED
{prio
= prio
, doitFn
= wrapF doitFn
}
194 BLOCKED (wrapF blockFn
)
198 BEVT(List.map wrapBaseEvt bevts
)
200 CHOOSE(List.map wrap
' evts
)
202 GUARD(fn () => wrap (g (), wfn
))
204 WNACK(fn evt
=> wrap (f evt
, wfn
))
208 fun wrapHandler (evt
: 'a event
, hfn
: exn
-> 'a
) : 'a event
=
210 fun wrapF f x
= (f x
) handle exn
=> hfn exn
211 fun wrapBaseEvt
pollFn () =
213 ENABLED
{prio
, doitFn
} =>
214 ENABLED
{prio
= prio
, doitFn
= wrapF doitFn
}
216 BLOCKED (wrapF blockFn
)
220 BEVT(List.map wrapBaseEvt bevts
)
222 CHOOSE(List.map wrap
' evts
)
224 GUARD(fn () => wrapHandler (g (), hfn
))
226 WNACK(fn evt
=> wrapHandler (f evt
, hfn
))
234 fun choose (evts
: 'a event list
) : 'a event
=
236 val () = Assert
.assertNonAtomic
' "Event.choose"
237 val () = debug
' "choose(1)" (* NonAtomic
*)
238 val () = Assert
.assertNonAtomic
' "Event.choose(1)"
239 fun gatherBEvts (evts
, bevts
') =
240 case (evts
, bevts
') of
241 ([], bevts
') => BEVT bevts
'
242 |
((BEVT bevts
)::evts
, bevts
') => gatherBEvts (evts
, bevts @ bevts
')
243 |
(evts
, []) => gather (evts
, [])
244 |
(evts
, bevts
') => gather (evts
, [BEVT bevts
'])
245 and gather (evts
, evts
') =
246 case (evts
, evts
') of
248 |
([], evts
') => CHOOSE evts
'
249 |
((CHOOSE cevts
)::evts
, evts
') =>
250 gather (evts
, cevts @ evts
')
251 |
((BEVT
[])::evts
, evts
') =>
253 |
((BEVT bevts
)::evts
, (BEVT bevts
')::evts
') =>
254 gather (evts
, BEVT (bevts @ bevts
')::evts
')
255 |
(evt
::evts
, evts
') =>
256 gather (evts
, evt
::evts
')
257 val evt
= gatherBEvts (List.rev evts
, [])
268 if j
= 1000000 then cnt
:= 0 else cnt
:= j
+ 1
272 fun selectDoitFn (doitFns
: {prio
: int, doitFn
: 'a
} list
) : 'a
=
274 val () = Assert
.assertAtomic
' ("Event.selectDoitFn", NONE
)
275 val () = debug
' "selectDoitFn(2)" (* Atomic
1 *)
276 val () = Assert
.assertAtomic
' ("Event.selectDoitFn(2)", SOME
1)
279 [{doitFn
, ...}] => doitFn
282 fun select (doitFns
, maxP
,
283 doitFnsMaxP
, numMaxP
,
284 doitFnsFixed
, numFixed
) =
286 [] => (case (doitFnsMaxP
, doitFnsFixed
) of
287 ([doitFn
], []) => doitFn
288 |
([], [doitFn
]) => doitFn
289 |
(doitFnsMaxP
, doitFnsFixed
) =>
292 val num
= numFixed
+ bias
* numMaxP
296 then List.nth (doitFnsFixed
, k
)
297 else List.nth (doitFnsMaxP
,
298 Int.mod(k
- numFixed
, numMaxP
))
300 |
{prio
, doitFn
}::doitFns
=>
302 then select(doitFns
, maxP
,
303 doitFnsMaxP
, numMaxP
,
304 doitFn
::doitFnsFixed
, numFixed
+ 1)
306 then select(doitFns
, prio
,
308 doitFnsFixed
, numFixed
)
310 then select(doitFns
, maxP
,
311 doitFn
::doitFnsMaxP
, numMaxP
+ 1,
312 doitFnsFixed
, numFixed
)
313 else select(doitFns
, maxP
,
314 doitFnsMaxP
, numMaxP
,
315 doitFnsFixed
, numFixed
)
317 select (doitFns
, 0, [], 0, [], 0)
322 fun syncOnBEvt (pollFn
: 'a base
) : 'a
=
324 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvt"
325 val () = debug
' "syncOnBEvt(1)" (* NonAtomic
*)
326 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvt(1)"
327 val () = S
.atomicBegin ()
328 val () = debug
' "syncOnBEvt(2)" (* Atomic
1 *)
329 val () = Assert
.assertAtomic
' ("Event.syncOnBEvt(2)", SOME
1)
332 ENABLED
{doitFn
, ...} => doitFn ()
334 let val (transId
, cleanUp
) = TransID
.mkFlg ()
335 in blockFn
{transId
= transId
,
339 val () = debug
' "syncOnBEvt(4)" (* NonAtomic
*)
340 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvt(4)"
345 (* this function handles the
case of synchronizing on a list
of
346 * base
events (w
/o any negative acknowledgements
). It also handles
347 * the
case of syncrhonizing on NEVER
.
349 fun syncOnBEvts (bevts
: 'a base list
) : 'a
=
351 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvts"
352 val () = debug
' "syncOnBEvts(1)" (* NonAtomic
*)
353 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvts(1)"
354 fun ext (bevts
, blockFns
) : 'a
=
356 val () = debug
' "syncOnBEvts(2).ext" (* Atomic
1 *)
357 val () = Assert
.assertAtomic
' ("Event.syncOnBEvts(2).ext", SOME
1)
362 val () = debug
' "syncOnBEvts(2).ext([])" (* Atomic
1 *)
363 val () = Assert
.assertAtomic
' ("Event.syncOnBEvts(2).ext([])", SOME
1)
366 (fn (t
: 'a S
.thread
) =>
368 val (transId
, cleanUp
) = TransID
.mkFlg ()
369 fun log blockFns
: S
.rdy_thread
=
371 val () = debug
' "syncOnBEvts(2).ext([]).log" (* Atomic
1 *)
372 val () = Assert
.assertAtomic
' ("Event.syncOnBEvts(2).ext([]).log", SOME
1)
376 | blockFn
::blockFns
=>
380 val () = S
.atomicBegin ()
381 val x
= blockFn
{transId
= transId
,
383 next
= fn () => log blockFns
}
384 in S
.switch(fn _
=> S
.prepVal (t
, x
))
393 ENABLED doitFn
=> extRdy (bevts
, [doitFn
])
394 | BLOCKED blockFn
=> ext (bevts
, blockFn
::blockFns
))
396 and extRdy (bevts
, doitFns
) : 'a
=
398 val () = debug
' "syncOnBEvts(2).extRdy" (* Atomic
1*)
399 val () = Assert
.assertAtomic
' ("Event.syncOnBEvts(2).extRdy", SOME
1)
403 let val doitFn
= selectDoitFn doitFns
408 ENABLED doitFn
=> extRdy (bevts
, doitFn
::doitFns
)
409 | _
=> extRdy (bevts
, doitFns
))
413 [] => S
.switchToNext (fn _
=> ())
414 |
[bevt
] => syncOnBEvt bevt
415 | bevts
=> (S
.atomicBegin (); ext (bevts
, []))
416 val () = debug
' "syncOnBEvts(4)" (* NonAtomic
*)
417 val () = Assert
.assertNonAtomic
' "Event.syncOnBEvts(4)"
422 (* walk the event group tree
, collecting the base
events (with associated
423 * ack flags
), and a list
of flag sets
. A flag set is
a (cvar
* ack flag list
)
424 * pair
, where the flags are those associated
with the events covered by the
427 type ack_flg
= bool ref
428 type ack_flgs
= ack_flg list
429 type 'a back
= 'a base
* ack_flg
430 type 'a backs
= 'a back list
431 type flg_set
= cvar
* ack_flg list
432 type flg_sets
= flg_set list
433 fun collect (gevt
: 'a group
) : 'a backs
* flg_sets
=
435 fun gatherWrapped (gevt
: 'a group
, backs
: 'a backs
, flgSets
: flg_sets
) :
436 'a backs
* flg_sets
=
438 fun gather (gevt
: 'a group
, backs
: 'a backs
,
439 ackFlgs
: ack_flgs
, flgSets
: flg_sets
) :
440 'a backs
* ack_flgs
* flg_sets
=
444 fun append (bevts
, backs
, ackFlgs
) =
446 [] => (backs
, ackFlgs
)
448 let val ackFlg
= ref
false
449 in append (bevts
, (bevt
, ackFlg
)::backs
, ackFlg
::ackFlgs
)
451 val (backs
', ackFlgs
') = append (bevts
, backs
, ackFlgs
)
453 (backs
', ackFlgs
', flgSets
)
457 fun f (gevt
', (backs
', ackFlgs
', flgSets
')) =
458 gather (gevt
', backs
', ackFlgs
', flgSets
')
459 in List.foldl
f (backs
, ackFlgs
, flgSets
) gevt
461 |
NACK (cvar
, gevt
) =>
463 val (backs
', ackFlgs
', flgSets
') =
464 gather (gevt
, backs
, [], flgSets
)
466 (backs
', ackFlgs
' @ ackFlgs
, (cvar
, ackFlgs
')::flgSets
')
468 val (backs
, _
, flgSets
) = gather (gevt
, backs
, [], flgSets
)
476 val ackFlg
= ref
false
477 fun gather (gevt
: 'a group
, backs
: 'a backs
, flgSets
: flg_sets
) :
478 'a backs
* flg_sets
=
482 fun append (bevts
, backs
) =
485 | bevt
::bevts
=> append (bevts
, (bevt
, ackFlg
)::backs
)
487 (append (bevts
, backs
), flgSets
)
491 fun f (gevt
', (backs
', flgSets
')) =
492 gather(gevt
', backs
', flgSets
')
493 in List.foldl
f (backs
, flgSets
) gevt
496 gatherWrapped (gevt
, backs
, flgSets
)
498 gather (gevt
, [], [])
500 | gevt
=> gatherWrapped (gevt
, [], [])
503 (* this function handles the more complicated
case of synchronization
504 * on groups
of events
where negative acknowledgements are involved
.
506 fun syncOnGrp (gevt
: 'a group
) : 'a
=
508 val () = Assert
.assertNonAtomic
' "Event.syncOnGrp"
509 val () = debug
' "syncOnGrp(1)" (* NonAtomic
*)
510 val () = Assert
.assertNonAtomic
' "Event.syncOnGrp(1)"
511 val (backs
, flgSets
) = collect gevt
514 val () = debug
' "syncOnGrp.chkCVars" (* Atomic
1 *)
515 val () = Assert
.assertAtomic
' ("Event.syncOnGrp.chkCVars", SOME
1)
516 (* chkCVar checks the flags
of a flag set
.
517 * If they are all
false, then the corresponding cvar
518 * is set to signal the negative ack
.
520 fun chkCVar (cvar
, flgs
) =
521 if List.exists
! flgs
523 else atomicCVarSet cvar
525 List.app chkCVar flgSets
527 fun ext (backs
, blockFns
) : 'a
=
529 val () = debug
' "syncOnGrp(2).ext" (* Atomic
1 *)
530 val () = Assert
.assertAtomic
' ("Event.syncOnGrp(2).ext", SOME
1)
535 val () = debug
' "syncOnGrp(2).ext([])" (* Atomic
1 *)
536 val () = Assert
.assertAtomic
' ("Event.syncOnGrp(2).ext([])", SOME
1)
539 (fn (t
: 'a S
.thread
) =>
541 val (transId
, cleanUp
) = TransID
.mkFlg ()
542 val cleanUp
= fn flg
=> fn () =>
546 fun log blockFns
: S
.rdy_thread
=
548 val () = debug
' "syncOnGrp(2).ext([]).log" (* Atomic
1 *)
549 val () = Assert
.assertAtomic
' ("Event.syncOnGrp(2).ext([]).log", SOME
1)
553 |
(blockFn
,ackFlg
)::blockFns
=>
557 val () = S
.atomicBegin ()
558 val x
= blockFn
{transId
= transId
,
559 cleanUp
= cleanUp ackFlg
,
560 next
= fn () => log blockFns
}
561 in S
.switch(fn _
=> S
.prepVal (t
, x
))
568 |
(pollFn
,ackFlg
)::backs
=>
570 ENABLED
{prio
, doitFn
} =>
571 extRdy (backs
, [{prio
= prio
,doitFn
= (doitFn
, ackFlg
)}])
572 | BLOCKED blockFn
=> ext (backs
, (blockFn
,ackFlg
)::blockFns
))
574 and extRdy (backs
, doitFns
) : 'a
=
576 val () = debug
' "syncOnGrp.extRdy(2)" (* Atomic
1*)
577 val () = Assert
.assertAtomic
' ("Event.syncOnGrp.extRdy(2)", SOME
1)
581 val (doitFn
, flg
) = selectDoitFn doitFns
587 |
(pollFn
,ackFlg
)::backs
=>
589 ENABLED
{prio
, doitFn
} =>
590 extRdy (backs
, {prio
= prio
, doitFn
= (doitFn
, ackFlg
)}::doitFns
)
591 | _
=> extRdy (backs
, doitFns
))
593 val x
= (S
.atomicBegin (); ext (backs
, []))
594 val () = debug
' "syncOnGrp(4)" (* NonAtomic
*)
595 val () = Assert
.assertNonAtomic
' "Event.syncOnGrp(4)"
601 (* force the evaluation
of any guards
in an event collection
,
602 * returning an event group
.
604 fun forceBL (evts
: 'a event list
, bevts
: 'a base list
) : 'a group
=
609 BASE bevts
' => forceBL (evts
, bevts
' @ bevts
)
610 | GRP gevts
=> forceGL (evts
, if List.null bevts
then gevts
else gevts @
[BASE bevts
])
611 | gevt
=> forceGL (evts
, if List.null bevts
then [gevt
] else [gevt
, BASE bevts
]))
612 and forceGL (evts
: 'a event list
, gevts
: 'a group list
) : 'a group
=
613 case (evts
, gevts
) of
615 |
([], gevts
) => GRP gevts
616 |
(evt
::evts
, gevts
) =>
617 (case (force evt
, gevts
) of
619 forceGL (evts
, gevts
)
620 |
(BASE bevts
', (BASE bevts
)::gevts
) =>
621 forceGL (evts
, BASE (bevts
' @ bevts
)::gevts
)
622 |
(GRP gevts
', gevts
) =>
623 forceGL (evts
, gevts
' @ gevts
)
625 forceGL (evts
, gevt
::gevts
))
626 and force (evt
: 'a event
) : 'a group
=
630 BEVT bevts
=> BASE bevts
631 | CHOOSE evts
=> forceBL (evts
, [])
632 | GUARD g
=> force (g ())
634 let val cvar
= CVar
.new ()
635 in NACK(cvar
, force (f (cvarGetEvt cvar
)))
643 val () = Assert
.assertNonAtomic
' "Event.sync"
644 val () = debug
' "sync(1)" (* NonAtomic
*)
645 val () = Assert
.assertNonAtomic
' "Event.sync(1)"
648 BASE bevts
=> syncOnBEvts bevts
649 | gevt
=> syncOnGrp gevt
650 val () = debug
' "sync(4)" (* NonAtomic
*)
651 val () = Assert
.assertNonAtomic
' "Event.sync(4)"
655 fun select (evts
: 'a event list
) : 'a
=
657 val () = Assert
.assertNonAtomic
' "Event.select"
658 val () = debug
' "select(1)" (* NonAtomic
*)
659 val () = Assert
.assertNonAtomic
' "Event.select(1)"
661 case forceBL (evts
, []) of
662 BASE bevts
=> syncOnBEvts bevts
663 | gevt
=> syncOnGrp gevt
664 val () = debug
' "select(4)" (* NonAtomic
*)
665 val () = Assert
.assertNonAtomic
' "Event.select(4)"