2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1995 AT
&T Bell Laboratories
9 * COPYRIGHT (c
) 1989-1991 John H
. Reppy
11 * Asynchronous
channels (called mailboxes
).
14 structure Mailbox
: MAILBOX_EXTRA
=
16 structure Assert
= LocalAssert(val assert
= false)
17 structure Debug
= LocalDebug(val debug
= false)
19 structure Q
= FunQueue
20 structure S
= Scheduler
22 fun debug msg
= Debug
.sayDebug ([S
.atomicMsg
, S
.tidMsg
], msg
)
23 fun debug
' msg
= debug (fn () => msg
)
25 datatype trans_id
= datatype TransID
.trans_id
26 datatype trans_id_state
= datatype TransID
.trans_id_state
29 (* the state
of a mailbox
. The queue
of the NONEMPTY constructor should
30 * never be
empty (use EMPTY instead
).
33 EMPTY
of (TransID
.trans_id
* 'a S
.thread
) Q
.t
34 | NONEMPTY
of (int * 'a Q
.t
)
36 datatype 'a mbox
= MB
of 'a state ref
39 fun resetMbox (MB state
) = state
:= EMPTY (Q
.new ())
42 fun mailbox () = MB (ref (EMPTY (Q
.new ())))
44 fun sameMailbox (MB s1
, MB s2
) = (s1
= s2
)
47 fun cleaner (TXID txst
, _
) =
48 case !txst
of CANCEL
=> true | _
=> false
51 Q
.cleanAndDeque (q
, cleaner
)
54 fun send (MB state
, x
) =
56 val () = Assert
.assertNonAtomic
' "Mailbox.send"
57 val () = debug
' "Mailbox.send(1)" (* NonAtomic
*)
58 val () = Assert
.assertNonAtomic
' "Mailbox.send(1)"
59 val () = S
.atomicBegin ()
60 val () = debug
' "Mailbox.send(2)" (* Atomic
1 *)
61 val () = Assert
.assertAtomic
' ("Mailbox.send(2)", SOME
1)
66 val () = debug
' "Mailbox.send(3.1.1)" (* Atomic
1 *)
67 val () = Assert
.assertAtomic
' ("Mailbox.send(3.1.1)", SOME
1)
69 case (cleanAndDeque q
) of
72 in state
:= NONEMPTY (1, Q
.enque (q
, x
))
75 |
(SOME (transId
', t
'), q
') =>
79 ; TransID
.force transId
'
83 (* we force a context switch here to prevent
84 * a producer from outrunning a consumer
.
86 S
.atomicReadyAndSwitchToNext
87 (fn () => state
:= NONEMPTY (p
, Q
.enque (q
, x
)))
88 val () = debug
' "Mailbox.send(4)" (* NonAtomic
*)
89 val () = Assert
.assertNonAtomic
' "Channel.send(4)"
94 fun getMsg (state
, q
) =
98 SOME (msg
, q
') => (msg
, q
')
99 | NONE
=> raise Fail
"Mailbox:getMsg"
100 val () = if Q
.empty q
'
101 then state
:= EMPTY (Q
.new ())
102 else state
:= NONEMPTY (1, q
')
103 val () = S
.atomicEnd ()
108 fun recv (MB state
) =
110 val () = Assert
.assertNonAtomic
' "Mailbox.recv"
111 val () = debug
' "Mailbox.recv(1)" (* NonAtomic
*)
112 val () = Assert
.assertNonAtomic
' "Mailbox.recv(1)"
113 val () = S
.atomicBegin ()
114 val () = debug
' "Mailbox.recv(2)" (* Atomic
1 *)
115 val () = Assert
.assertAtomic
' ("Mailbox.recv(2)", SOME
1)
122 (fn t
=> state
:= EMPTY (Q
.enque (q
, (TransID
.mkTxId (), t
))))
127 |
NONEMPTY (_
, q
) => getMsg (state
, q
)
128 val () = debug
' "Mailbox.recv(4)" (* NonAtomic
*)
129 val () = Assert
.assertNonAtomic
' "Channel.recv(4)"
134 fun recvEvt (MB state
) =
136 fun blockFn
{transId
, cleanUp
: unit
-> unit
, next
} =
141 | _
=> raise Fail
"Mailbox:recvEvt:blockFn"
144 (fn t
=> (state
:= EMPTY (Q
.enque (q
, (transId
, t
)))
153 EMPTY _
=> E
.blocked blockFn
154 |
NONEMPTY (prio
, q
) =>
155 (state
:= NONEMPTY (prio
+ 1, q
)
156 ; E
.enabled
{prio
= prio
,
157 doitFn
= fn () => getMsg (state
, q
)})
162 fun recvPoll (MB state
) =
165 EMPTY _
=> (S
.atomicEnd(); NONE
)
166 |
NONEMPTY (_
, q
) => SOME (getMsg (state
, q
)))