Commit | Line | Data |
---|---|---|
7f918cf1 CE |
1 | (* channel.sig |
2 | * 2004 Matthew Fluet (mfluet@acm.org) | |
3 | * Ported to MLton threads. | |
4 | *) | |
5 | ||
6 | (* channel.sml | |
7 | * | |
8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | |
9 | * COPYRIGHT (c) 1989-1991 John H. Reppy | |
10 | * | |
11 | * The representation of synchronous channels. | |
12 | * | |
13 | * To ensure that we always leave the atomic region exactly once, we | |
14 | * require that the blocking operation be responsible for leaving the | |
15 | * atomic region (in the event case, it must also execute the clean-up | |
16 | * action). The doitFn always transfers control to the blocked thread | |
17 | * without leaving the atomic region. Note that the send (and sendEvt) | |
18 | * blockFns run using the receiver's thread ID. | |
19 | *) | |
20 | ||
21 | structure Channel : CHANNEL_EXTRA = | |
22 | struct | |
23 | structure Assert = LocalAssert(val assert = false) | |
24 | structure Debug = LocalDebug(val debug = false) | |
25 | ||
26 | structure Q = ImpQueue | |
27 | structure S = Scheduler | |
28 | structure E = Event | |
29 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) | |
30 | fun debug' msg = debug (fn () => msg) | |
31 | ||
32 | datatype trans_id = datatype TransID.trans_id | |
33 | datatype trans_id_state = datatype TransID.trans_id_state | |
34 | ||
35 | ||
36 | datatype 'a chan = | |
37 | CHAN of {prio : int ref, | |
38 | inQ : (trans_id * 'a S.thread) Q.t, | |
39 | outQ : (trans_id * 'a S.thread S.thread) Q.t} | |
40 | ||
41 | (* | |
42 | fun resetChan (CHAN {prio, inQ, outQ}) = | |
43 | (prio := 1 | |
44 | ; Q.reset inQ | |
45 | ; Q.reset outQ) | |
46 | *) | |
47 | ||
48 | fun channel () = CHAN {prio = ref 1, inQ = Q.new (), outQ = Q.new ()} | |
49 | ||
50 | (* sameChannel : ('a chan * 'a chan) -> bool *) | |
51 | fun sameChannel (CHAN {prio = prio1, ...}, CHAN {prio = prio2, ...}) = | |
52 | prio1 = prio2 | |
53 | ||
54 | ||
55 | (* bump a priority value by one, returning the old value *) | |
56 | fun bumpPriority (p as ref n) = (p := n+1; n) | |
57 | ||
58 | (* functions to clean channel input and output queues *) | |
59 | local | |
60 | fun cleaner (TXID txst, _) = | |
61 | case !txst of CANCEL => true | _ => false | |
62 | in | |
63 | fun cleanAndChk (prio, q) : int = | |
64 | (Q.clean (q, cleaner) | |
65 | ; if Q.empty q | |
66 | then 0 | |
67 | else bumpPriority prio) | |
68 | fun cleanAndDeque q = | |
69 | Q.cleanAndDeque (q, cleaner) | |
70 | fun enqueAndClean (q, item) = | |
71 | Q.enqueAndClean (q, item, cleaner) | |
72 | end | |
73 | ||
74 | fun send (CHAN {prio, inQ, outQ}, msg) = | |
75 | let | |
76 | val () = Assert.assertNonAtomic' "Channel.send" | |
77 | val () = debug' "Chennel.send(1)" (* NonAtomic *) | |
78 | val () = Assert.assertNonAtomic' "Channel.send(1)" | |
79 | val () = S.atomicBegin () | |
80 | val () = debug' "Channel.send(2)" (* Atomic 1 *) | |
81 | val () = Assert.assertAtomic' ("Channel.send(2)", SOME 1) | |
82 | val () = | |
83 | case cleanAndDeque inQ of | |
84 | SOME (rtxid, rt) => | |
85 | let | |
86 | val () = debug' "Channel.send(3.1.1)" (* Atomic 1 *) | |
87 | val () = Assert.assertAtomic' ("Channel.send(3.1.1)", SOME 1) | |
88 | val () = | |
89 | S.readyAndSwitch | |
90 | (fn () => | |
91 | (prio := 1 | |
92 | ; TransID.force rtxid | |
93 | ; S.prepVal (rt, msg))) | |
94 | val () = debug' "Channel.send(3.1.2)" (* NonAtomic *) | |
95 | val () = Assert.assertNonAtomic' "Channel.send(3.1.2)" | |
96 | in | |
97 | () | |
98 | end | |
99 | | NONE => | |
100 | let | |
101 | val () = debug' "Channel.send(3.2.1)" (* Atomic 1 *) | |
102 | val () = Assert.assertAtomic' ("Channel.send(3.2.1)", SOME 1) | |
103 | val rt = | |
104 | S.atomicSwitchToNext | |
105 | (fn st => Q.enque (outQ, (TransID.mkTxId (), st))) | |
106 | val () = debug' "Channel.send(3.2.2)" (* Atomic 1 *) | |
107 | val () = Assert.assertAtomic' ("Channel.send(3.2.2)", SOME 1) | |
108 | val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg)) | |
109 | val () = debug' "Chanell.send(3.2.3)" (* NonAtomic *) | |
110 | val () = Assert.assertNonAtomic' "Channel.send(3.2.2)" | |
111 | in | |
112 | () | |
113 | end | |
114 | val () = debug' "Channel.send(4)" (* NonAtomic *) | |
115 | val () = Assert.assertNonAtomic' "Channel.send(4)" | |
116 | in | |
117 | () | |
118 | end | |
119 | ||
120 | fun sendEvt (CHAN {prio, inQ, outQ}, msg) = | |
121 | let | |
122 | fun doitFn () = | |
123 | let | |
124 | val () = Assert.assertAtomic' ("Channel.sendEvt.doitFn", NONE) | |
125 | val (rtxid, rt) = valOf (Q.deque inQ) | |
126 | val () = debug' "Channel.sendEvt(3.1.1)" (* Atomic 1 *) | |
127 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.1.1)", SOME 1) | |
128 | val () = | |
129 | S.readyAndSwitch | |
130 | (fn () => | |
131 | (prio := 1 | |
132 | ; TransID.force rtxid | |
133 | ; S.prepVal (rt, msg))) | |
134 | val () = debug' "Channel.sendEvt(3.1.2)" (* NonAtomic *) | |
135 | val () = Assert.assertNonAtomic' "Channel.sendEvt(3.1.2)" | |
136 | in | |
137 | () | |
138 | end | |
139 | fun blockFn {transId, cleanUp, next} = | |
140 | let | |
141 | val () = Assert.assertAtomic' ("Channel.sendEvt.blockFn", NONE) | |
142 | val () = debug' "Channel.sendEvt(3.2.1)" (* Atomic 1 *) | |
143 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.1)", SOME 1) | |
144 | val rt = | |
145 | S.atomicSwitch | |
146 | (fn st => | |
147 | (enqueAndClean (outQ, (transId, st)) | |
148 | ; next ())) | |
149 | val () = debug' "Channel.sendEvt(3.2.2)" (* Atomic 1 *) | |
150 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.2)", SOME 1) | |
151 | val () = cleanUp () | |
152 | val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg)) | |
153 | val () = debug' "Channel.sendEvt(3.2.3)" (* NonAtomic *) | |
154 | val () = Assert.assertNonAtomic' "Channel.sendEvt(3.2.2)" | |
155 | in | |
156 | () | |
157 | end | |
158 | fun pollFn () = | |
159 | let | |
160 | val () = Assert.assertAtomic' ("Channel.sendEvt.pollFn", NONE) | |
161 | val () = debug' "Channel.sendEvt(2)" (* Atomic 1 *) | |
162 | val () = Assert.assertAtomic' ("Channel.sendEvt(2)", SOME 1) | |
163 | in | |
164 | case cleanAndChk (prio, inQ) of | |
165 | 0 => E.blocked blockFn | |
166 | | prio => E.enabled {prio = prio, doitFn = doitFn} | |
167 | end | |
168 | in | |
169 | E.bevt pollFn | |
170 | end | |
171 | ||
172 | fun sendPoll (CHAN {prio, inQ, ...}, msg) = | |
173 | let | |
174 | val () = Assert.assertNonAtomic' "Channel.sendPoll" | |
175 | val () = debug' "Channel.sendPoll(1)" (* NonAtomic *) | |
176 | val () = Assert.assertNonAtomic' "Channel.sendPoll(1)" | |
177 | val () = S.atomicBegin () | |
178 | val () = debug' "Channel.sendPoll(2)" (* Atomic 1 *) | |
179 | val () = Assert.assertAtomic' ("Channel.sendPoll(1)", SOME 1) | |
180 | val b = | |
181 | case cleanAndDeque inQ of | |
182 | SOME (rtxid, rt) => | |
183 | let | |
184 | val () = debug' "Channel.sendPoll(3.1.1)" (* Atomic 1 *) | |
185 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.1.1)", SOME 1) | |
186 | val () = | |
187 | S.readyAndSwitch | |
188 | (fn () => | |
189 | (prio := 1 | |
190 | ; TransID.force rtxid | |
191 | ; S.prepVal (rt, msg))) | |
192 | val b = true | |
193 | val () = debug' "Channel.sendPoll(3.1.2)" (* NonAtomic *) | |
194 | val () = Assert.assertNonAtomic' "Channel.sendPoll(3.1.2)" | |
195 | in | |
196 | b | |
197 | end | |
198 | | NONE => | |
199 | let | |
200 | val () = debug' "Channel.sendPoll(3.2.1)" (* Atomic 1 *) | |
201 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.1)", SOME 1) | |
202 | val b = false | |
203 | val () = debug' "Channel.sendPoll(3.2.2)" (* Atomic 1 *) | |
204 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.2)", SOME 1) | |
205 | val () = S.atomicEnd () | |
206 | val () = debug' "Channel.sendPoll(3.2.3)" (* NonAtomic *) | |
207 | val () = Assert.assertNonAtomic' "Channel.sendPoll(3.2.2)" | |
208 | in | |
209 | b | |
210 | end | |
211 | val () = debug' "Channel.sendPoll(4)" (* NonAtomic *) | |
212 | val () = Assert.assertNonAtomic' "Channel.sendPoll(4)" | |
213 | in | |
214 | b | |
215 | end | |
216 | ||
217 | fun recv (CHAN {prio, inQ, outQ}) = | |
218 | let | |
219 | val () = Assert.assertNonAtomic' "Channel.recv" | |
220 | val () = debug' "Channel.recv(1)" (* NonAtomic *) | |
221 | val () = Assert.assertNonAtomic' "Channel.recv(1)" | |
222 | val () = S.atomicBegin () | |
223 | val () = debug' "Channel.recv(2)" (* Atomic 1 *) | |
224 | val () = Assert.assertAtomic' ("Channel.recv(2)", SOME 1) | |
225 | val msg = | |
226 | case cleanAndDeque outQ of | |
227 | SOME (stxid, st) => | |
228 | let | |
229 | val () = debug' "Channel.recv(3.1.1)" (* Atomic 1 *) | |
230 | val () = Assert.assertAtomic' ("Channel.recv(3.1.1)", SOME 1) | |
231 | val msg = | |
232 | S.switch | |
233 | (fn rt => | |
234 | (prio := 1 | |
235 | ; TransID.force stxid | |
236 | ; S.prepVal (st, rt))) | |
237 | val () = debug' "Channel.recv(3.1.2)" (* NonAtomic *) | |
238 | val () = Assert.assertNonAtomic' "Channel.recv(3.1.1)" | |
239 | in | |
240 | msg | |
241 | end | |
242 | | NONE => | |
243 | let | |
244 | val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *) | |
245 | val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1) | |
246 | val msg = | |
247 | S.atomicSwitchToNext | |
248 | (fn rt => enqueAndClean (inQ, (TransID.mkTxId (), rt))) | |
249 | val () = debug' "Channel.recv(3.2.2)" (* Atomic 1 *) | |
250 | val () = Assert.assertAtomic' ("Channel.recv(3.2.2)", SOME 1) | |
251 | val () = S.atomicEnd () | |
252 | val () = debug' "Channel.recv(3.2.3)" (* NonAtomic *) | |
253 | val () = Assert.assertNonAtomic' "Channel.recv(3.2.3)" | |
254 | in | |
255 | msg | |
256 | end | |
257 | val () = debug' "Channel.recv(4)" (* NonAtomic *) | |
258 | val () = Assert.assertNonAtomic' "Channel.recv(4)" | |
259 | in | |
260 | msg | |
261 | end | |
262 | ||
263 | fun recvEvt (CHAN {prio, inQ, outQ}) = | |
264 | let | |
265 | fun doitFn () = | |
266 | let | |
267 | val () = Assert.assertAtomic' ("Channel.recvEvt.doitFn", NONE) | |
268 | val (stxid, st) = valOf (Q.deque outQ) | |
269 | val () = debug' "Channel.recvEvt(3.1.1)" (* Atomic 1 *) | |
270 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.1.1)", SOME 1) | |
271 | val msg = | |
272 | S.switch | |
273 | (fn rt => | |
274 | (prio := 1 | |
275 | ; TransID.force stxid | |
276 | ; S.prepVal (st, rt))) | |
277 | val () = debug' "Channel.recvEvt(3.1.2)" (* NonAtomic *) | |
278 | val () = Assert.assertNonAtomic' "Channel.recvEvt(3.1.1)" | |
279 | in | |
280 | msg | |
281 | end | |
282 | fun blockFn {transId, cleanUp, next} = | |
283 | let | |
284 | val () = Assert.assertAtomic' ("Channel.recvEvt.blockFn", NONE) | |
285 | val () = debug' "Channel.recvEvt(3.2.1)" (* Atomic 1 *) | |
286 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.1)", SOME 1) | |
287 | val msg = | |
288 | S.atomicSwitch | |
289 | (fn rt => | |
290 | (enqueAndClean (inQ, (transId, rt)) | |
291 | ; next ())) | |
292 | val () = debug' "Channel.recvEvt(3.2.2)" (* Atomic 1 *) | |
293 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.2)", SOME 1) | |
294 | val () = cleanUp () | |
295 | val () = S.atomicEnd () | |
296 | val () = debug' "Channel.recvEvt(3.2.3)" (* NonAtomic *) | |
297 | val () = Assert.assertNonAtomic' "Channel.recvEvt(3.2.3)" | |
298 | in | |
299 | msg | |
300 | end | |
301 | fun pollFn () = | |
302 | let | |
303 | val () = Assert.assertAtomic' ("Channel.recvEvt.pollFn", NONE) | |
304 | val () = debug' "Channel.recvEvt(2)" (* Atomic 1 *) | |
305 | val () = Assert.assertAtomic' ("Channel.recvEvt(2)", SOME 1) | |
306 | in | |
307 | case cleanAndChk (prio, outQ) of | |
308 | 0 => E.blocked blockFn | |
309 | | prio => E.enabled {prio = prio, doitFn = doitFn} | |
310 | end | |
311 | in | |
312 | E.bevt pollFn | |
313 | end | |
314 | ||
315 | fun recvPoll (CHAN {prio, outQ, ...}) = | |
316 | let | |
317 | val () = Assert.assertNonAtomic' "Channel.recvPoll" | |
318 | val () = debug' "Channel.recvPoll(1)" (* NonAtomic *) | |
319 | val () = Assert.assertNonAtomic' "Channel.recvPoll(1)" | |
320 | val () = S.atomicBegin () | |
321 | val () = debug' "Channel.recvPoll(2)" (* Atomic 1 *) | |
322 | val () = Assert.assertAtomic' ("Channel.recvPoll(2)", SOME 1) | |
323 | val msg = | |
324 | case cleanAndDeque outQ of | |
325 | SOME (stxid, st) => | |
326 | let | |
327 | val () = debug' "Channel.recvPoll(3.1.1)" (* Atomic 1 *) | |
328 | val () = Assert.assertAtomic' ("Channel.recvPoll(3.1.1)", SOME 1) | |
329 | val msg = | |
330 | S.switch | |
331 | (fn rt => | |
332 | (prio := 1 | |
333 | ; TransID.force stxid | |
334 | ; S.prepVal (st, rt))) | |
335 | val msg = SOME msg | |
336 | val () = debug' "Channel.recvPoll(3.1.2)" (* NonAtomic *) | |
337 | val () = Assert.assertNonAtomic' "Channel.recvPoll(3.1.1)" | |
338 | in | |
339 | msg | |
340 | end | |
341 | | NONE => | |
342 | let | |
343 | val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *) | |
344 | val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1) | |
345 | val msg = NONE | |
346 | val () = debug' "Channel.recvPoll(3.2.2)" (* Atomic 1 *) | |
347 | val () = Assert.assertAtomic' ("Channel.recvPoll(3.2.2)", SOME 1) | |
348 | val () = S.atomicEnd () | |
349 | val () = debug' "Channel.recvPoll(3.2.3)" (* NonAtomic *) | |
350 | val () = Assert.assertNonAtomic' "Channel.recvPoll(3.2.3)" | |
351 | in | |
352 | msg | |
353 | end | |
354 | val () = debug' "Channel.recvPoll(4)" (* NonAtomic *) | |
355 | val () = Assert.assertNonAtomic' "Channel.recvPoll(4)" | |
356 | in | |
357 | msg | |
358 | end | |
359 | end |