Commit | Line | Data |
---|---|---|
7f918cf1 CE |
1 | (* mailbox.sml |
2 | * 2004 Matthew Fluet (mfluet@acm.org) | |
3 | * Ported to MLton threads. | |
4 | *) | |
5 | ||
6 | (* mailbox.sml | |
7 | * | |
8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories | |
9 | * COPYRIGHT (c) 1989-1991 John H. Reppy | |
10 | * | |
11 | * Asynchronous channels (called mailboxes). | |
12 | *) | |
13 | ||
14 | structure Mailbox : MAILBOX_EXTRA = | |
15 | struct | |
16 | structure Assert = LocalAssert(val assert = false) | |
17 | structure Debug = LocalDebug(val debug = false) | |
18 | ||
19 | structure Q = FunQueue | |
20 | structure S = Scheduler | |
21 | structure E = Event | |
22 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) | |
23 | fun debug' msg = debug (fn () => msg) | |
24 | ||
25 | datatype trans_id = datatype TransID.trans_id | |
26 | datatype trans_id_state = datatype TransID.trans_id_state | |
27 | ||
28 | ||
29 | (* the state of a mailbox. The queue of the NONEMPTY constructor should | |
30 | * never be empty (use EMPTY instead). | |
31 | *) | |
32 | datatype 'a state = | |
33 | EMPTY of (TransID.trans_id * 'a S.thread) Q.t | |
34 | | NONEMPTY of (int * 'a Q.t) | |
35 | ||
36 | datatype 'a mbox = MB of 'a state ref | |
37 | ||
38 | (* | |
39 | fun resetMbox (MB state) = state := EMPTY (Q.new ()) | |
40 | *) | |
41 | ||
42 | fun mailbox () = MB (ref (EMPTY (Q.new ()))) | |
43 | ||
44 | fun sameMailbox (MB s1, MB s2) = (s1 = s2) | |
45 | ||
46 | local | |
47 | fun cleaner (TXID txst, _) = | |
48 | case !txst of CANCEL => true | _ => false | |
49 | in | |
50 | fun cleanAndDeque q = | |
51 | Q.cleanAndDeque (q, cleaner) | |
52 | end | |
53 | ||
54 | fun send (MB state, x) = | |
55 | let | |
56 | val () = Assert.assertNonAtomic' "Mailbox.send" | |
57 | val () = debug' "Mailbox.send(1)" (* NonAtomic *) | |
58 | val () = Assert.assertNonAtomic' "Mailbox.send(1)" | |
59 | val () = S.atomicBegin () | |
60 | val () = debug' "Mailbox.send(2)" (* Atomic 1 *) | |
61 | val () = Assert.assertAtomic' ("Mailbox.send(2)", SOME 1) | |
62 | val () = | |
63 | case !state of | |
64 | EMPTY q => | |
65 | let | |
66 | val () = debug' "Mailbox.send(3.1.1)" (* Atomic 1 *) | |
67 | val () = Assert.assertAtomic' ("Mailbox.send(3.1.1)", SOME 1) | |
68 | in | |
69 | case (cleanAndDeque q) of | |
70 | (NONE, _) => | |
71 | (let val q = Q.new () | |
72 | in state := NONEMPTY (1, Q.enque (q, x)) | |
73 | end | |
74 | ; S.atomicEnd()) | |
75 | | (SOME (transId', t'), q') => | |
76 | S.readyAndSwitch | |
77 | (fn () => | |
78 | (state := EMPTY q' | |
79 | ; TransID.force transId' | |
80 | ; S.prepVal (t', x))) | |
81 | end | |
82 | | NONEMPTY (p, q) => | |
83 | (* we force a context switch here to prevent | |
84 | * a producer from outrunning a consumer. | |
85 | *) | |
86 | S.atomicReadyAndSwitchToNext | |
87 | (fn () => state := NONEMPTY (p, Q.enque (q, x))) | |
88 | val () = debug' "Mailbox.send(4)" (* NonAtomic *) | |
89 | val () = Assert.assertNonAtomic' "Channel.send(4)" | |
90 | in | |
91 | () | |
92 | end | |
93 | ||
94 | fun getMsg (state, q) = | |
95 | let | |
96 | val (msg, q') = | |
97 | case Q.deque q of | |
98 | SOME (msg, q') => (msg, q') | |
99 | | NONE => raise Fail "Mailbox:getMsg" | |
100 | val () = if Q.empty q' | |
101 | then state := EMPTY (Q.new ()) | |
102 | else state := NONEMPTY (1, q') | |
103 | val () = S.atomicEnd () | |
104 | in | |
105 | msg | |
106 | end | |
107 | ||
108 | fun recv (MB state) = | |
109 | let | |
110 | val () = Assert.assertNonAtomic' "Mailbox.recv" | |
111 | val () = debug' "Mailbox.recv(1)" (* NonAtomic *) | |
112 | val () = Assert.assertNonAtomic' "Mailbox.recv(1)" | |
113 | val () = S.atomicBegin () | |
114 | val () = debug' "Mailbox.recv(2)" (* Atomic 1 *) | |
115 | val () = Assert.assertAtomic' ("Mailbox.recv(2)", SOME 1) | |
116 | val msg = | |
117 | case !state of | |
118 | EMPTY q => | |
119 | let | |
120 | val msg = | |
121 | S.atomicSwitchToNext | |
122 | (fn t => state := EMPTY (Q.enque (q, (TransID.mkTxId (), t)))) | |
123 | in | |
124 | S.atomicEnd() | |
125 | ; msg | |
126 | end | |
127 | | NONEMPTY (_, q) => getMsg (state, q) | |
128 | val () = debug' "Mailbox.recv(4)" (* NonAtomic *) | |
129 | val () = Assert.assertNonAtomic' "Channel.recv(4)" | |
130 | in | |
131 | msg | |
132 | end | |
133 | ||
134 | fun recvEvt (MB state) = | |
135 | let | |
136 | fun blockFn {transId, cleanUp: unit -> unit, next} = | |
137 | let | |
138 | val q = | |
139 | case !state of | |
140 | EMPTY q => q | |
141 | | _ => raise Fail "Mailbox:recvEvt:blockFn" | |
142 | val msg = | |
143 | S.atomicSwitch | |
144 | (fn t => (state := EMPTY (Q.enque (q, (transId, t))) | |
145 | ; next ())) | |
146 | in | |
147 | cleanUp() | |
148 | ; S.atomicEnd() | |
149 | ; msg | |
150 | end | |
151 | fun pollFn () = | |
152 | case !state of | |
153 | EMPTY _ => E.blocked blockFn | |
154 | | NONEMPTY (prio, q) => | |
155 | (state := NONEMPTY (prio + 1, q) | |
156 | ; E.enabled {prio = prio, | |
157 | doitFn = fn () => getMsg (state, q)}) | |
158 | in | |
159 | E.bevt pollFn | |
160 | end | |
161 | ||
162 | fun recvPoll (MB state) = | |
163 | (S.atomicBegin() | |
164 | ; case !state of | |
165 | EMPTY _ => (S.atomicEnd(); NONE) | |
166 | | NONEMPTY (_, q) => SOME (getMsg (state, q))) | |
167 | end |