Import Upstream version 20180207
[hcoop/debian/mlton.git] / lib / cml / core-cml / channel.sml
CommitLineData
7f918cf1
CE
1(* channel.sig
2 * 2004 Matthew Fluet (mfluet@acm.org)
3 * Ported to MLton threads.
4 *)
5
6(* channel.sml
7 *
8 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
9 * COPYRIGHT (c) 1989-1991 John H. Reppy
10 *
11 * The representation of synchronous channels.
12 *
13 * To ensure that we always leave the atomic region exactly once, we
14 * require that the blocking operation be responsible for leaving the
15 * atomic region (in the event case, it must also execute the clean-up
16 * action). The doitFn always transfers control to the blocked thread
17 * without leaving the atomic region. Note that the send (and sendEvt)
18 * blockFns run using the receiver's thread ID.
19 *)
20
21structure Channel : CHANNEL_EXTRA =
22 struct
23 structure Assert = LocalAssert(val assert = false)
24 structure Debug = LocalDebug(val debug = false)
25
26 structure Q = ImpQueue
27 structure S = Scheduler
28 structure E = Event
29 fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
30 fun debug' msg = debug (fn () => msg)
31
32 datatype trans_id = datatype TransID.trans_id
33 datatype trans_id_state = datatype TransID.trans_id_state
34
35
36 datatype 'a chan =
37 CHAN of {prio : int ref,
38 inQ : (trans_id * 'a S.thread) Q.t,
39 outQ : (trans_id * 'a S.thread S.thread) Q.t}
40
41 (*
42 fun resetChan (CHAN {prio, inQ, outQ}) =
43 (prio := 1
44 ; Q.reset inQ
45 ; Q.reset outQ)
46 *)
47
48 fun channel () = CHAN {prio = ref 1, inQ = Q.new (), outQ = Q.new ()}
49
50 (* sameChannel : ('a chan * 'a chan) -> bool *)
51 fun sameChannel (CHAN {prio = prio1, ...}, CHAN {prio = prio2, ...}) =
52 prio1 = prio2
53
54
55 (* bump a priority value by one, returning the old value *)
56 fun bumpPriority (p as ref n) = (p := n+1; n)
57
58 (* functions to clean channel input and output queues *)
59 local
60 fun cleaner (TXID txst, _) =
61 case !txst of CANCEL => true | _ => false
62 in
63 fun cleanAndChk (prio, q) : int =
64 (Q.clean (q, cleaner)
65 ; if Q.empty q
66 then 0
67 else bumpPriority prio)
68 fun cleanAndDeque q =
69 Q.cleanAndDeque (q, cleaner)
70 fun enqueAndClean (q, item) =
71 Q.enqueAndClean (q, item, cleaner)
72 end
73
74 fun send (CHAN {prio, inQ, outQ}, msg) =
75 let
76 val () = Assert.assertNonAtomic' "Channel.send"
77 val () = debug' "Chennel.send(1)" (* NonAtomic *)
78 val () = Assert.assertNonAtomic' "Channel.send(1)"
79 val () = S.atomicBegin ()
80 val () = debug' "Channel.send(2)" (* Atomic 1 *)
81 val () = Assert.assertAtomic' ("Channel.send(2)", SOME 1)
82 val () =
83 case cleanAndDeque inQ of
84 SOME (rtxid, rt) =>
85 let
86 val () = debug' "Channel.send(3.1.1)" (* Atomic 1 *)
87 val () = Assert.assertAtomic' ("Channel.send(3.1.1)", SOME 1)
88 val () =
89 S.readyAndSwitch
90 (fn () =>
91 (prio := 1
92 ; TransID.force rtxid
93 ; S.prepVal (rt, msg)))
94 val () = debug' "Channel.send(3.1.2)" (* NonAtomic *)
95 val () = Assert.assertNonAtomic' "Channel.send(3.1.2)"
96 in
97 ()
98 end
99 | NONE =>
100 let
101 val () = debug' "Channel.send(3.2.1)" (* Atomic 1 *)
102 val () = Assert.assertAtomic' ("Channel.send(3.2.1)", SOME 1)
103 val rt =
104 S.atomicSwitchToNext
105 (fn st => Q.enque (outQ, (TransID.mkTxId (), st)))
106 val () = debug' "Channel.send(3.2.2)" (* Atomic 1 *)
107 val () = Assert.assertAtomic' ("Channel.send(3.2.2)", SOME 1)
108 val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg))
109 val () = debug' "Chanell.send(3.2.3)" (* NonAtomic *)
110 val () = Assert.assertNonAtomic' "Channel.send(3.2.2)"
111 in
112 ()
113 end
114 val () = debug' "Channel.send(4)" (* NonAtomic *)
115 val () = Assert.assertNonAtomic' "Channel.send(4)"
116 in
117 ()
118 end
119
120 fun sendEvt (CHAN {prio, inQ, outQ}, msg) =
121 let
122 fun doitFn () =
123 let
124 val () = Assert.assertAtomic' ("Channel.sendEvt.doitFn", NONE)
125 val (rtxid, rt) = valOf (Q.deque inQ)
126 val () = debug' "Channel.sendEvt(3.1.1)" (* Atomic 1 *)
127 val () = Assert.assertAtomic' ("Channel.sendEvt(3.1.1)", SOME 1)
128 val () =
129 S.readyAndSwitch
130 (fn () =>
131 (prio := 1
132 ; TransID.force rtxid
133 ; S.prepVal (rt, msg)))
134 val () = debug' "Channel.sendEvt(3.1.2)" (* NonAtomic *)
135 val () = Assert.assertNonAtomic' "Channel.sendEvt(3.1.2)"
136 in
137 ()
138 end
139 fun blockFn {transId, cleanUp, next} =
140 let
141 val () = Assert.assertAtomic' ("Channel.sendEvt.blockFn", NONE)
142 val () = debug' "Channel.sendEvt(3.2.1)" (* Atomic 1 *)
143 val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.1)", SOME 1)
144 val rt =
145 S.atomicSwitch
146 (fn st =>
147 (enqueAndClean (outQ, (transId, st))
148 ; next ()))
149 val () = debug' "Channel.sendEvt(3.2.2)" (* Atomic 1 *)
150 val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.2)", SOME 1)
151 val () = cleanUp ()
152 val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg))
153 val () = debug' "Channel.sendEvt(3.2.3)" (* NonAtomic *)
154 val () = Assert.assertNonAtomic' "Channel.sendEvt(3.2.2)"
155 in
156 ()
157 end
158 fun pollFn () =
159 let
160 val () = Assert.assertAtomic' ("Channel.sendEvt.pollFn", NONE)
161 val () = debug' "Channel.sendEvt(2)" (* Atomic 1 *)
162 val () = Assert.assertAtomic' ("Channel.sendEvt(2)", SOME 1)
163 in
164 case cleanAndChk (prio, inQ) of
165 0 => E.blocked blockFn
166 | prio => E.enabled {prio = prio, doitFn = doitFn}
167 end
168 in
169 E.bevt pollFn
170 end
171
172 fun sendPoll (CHAN {prio, inQ, ...}, msg) =
173 let
174 val () = Assert.assertNonAtomic' "Channel.sendPoll"
175 val () = debug' "Channel.sendPoll(1)" (* NonAtomic *)
176 val () = Assert.assertNonAtomic' "Channel.sendPoll(1)"
177 val () = S.atomicBegin ()
178 val () = debug' "Channel.sendPoll(2)" (* Atomic 1 *)
179 val () = Assert.assertAtomic' ("Channel.sendPoll(1)", SOME 1)
180 val b =
181 case cleanAndDeque inQ of
182 SOME (rtxid, rt) =>
183 let
184 val () = debug' "Channel.sendPoll(3.1.1)" (* Atomic 1 *)
185 val () = Assert.assertAtomic' ("Channel.sendPoll(3.1.1)", SOME 1)
186 val () =
187 S.readyAndSwitch
188 (fn () =>
189 (prio := 1
190 ; TransID.force rtxid
191 ; S.prepVal (rt, msg)))
192 val b = true
193 val () = debug' "Channel.sendPoll(3.1.2)" (* NonAtomic *)
194 val () = Assert.assertNonAtomic' "Channel.sendPoll(3.1.2)"
195 in
196 b
197 end
198 | NONE =>
199 let
200 val () = debug' "Channel.sendPoll(3.2.1)" (* Atomic 1 *)
201 val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.1)", SOME 1)
202 val b = false
203 val () = debug' "Channel.sendPoll(3.2.2)" (* Atomic 1 *)
204 val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.2)", SOME 1)
205 val () = S.atomicEnd ()
206 val () = debug' "Channel.sendPoll(3.2.3)" (* NonAtomic *)
207 val () = Assert.assertNonAtomic' "Channel.sendPoll(3.2.2)"
208 in
209 b
210 end
211 val () = debug' "Channel.sendPoll(4)" (* NonAtomic *)
212 val () = Assert.assertNonAtomic' "Channel.sendPoll(4)"
213 in
214 b
215 end
216
217 fun recv (CHAN {prio, inQ, outQ}) =
218 let
219 val () = Assert.assertNonAtomic' "Channel.recv"
220 val () = debug' "Channel.recv(1)" (* NonAtomic *)
221 val () = Assert.assertNonAtomic' "Channel.recv(1)"
222 val () = S.atomicBegin ()
223 val () = debug' "Channel.recv(2)" (* Atomic 1 *)
224 val () = Assert.assertAtomic' ("Channel.recv(2)", SOME 1)
225 val msg =
226 case cleanAndDeque outQ of
227 SOME (stxid, st) =>
228 let
229 val () = debug' "Channel.recv(3.1.1)" (* Atomic 1 *)
230 val () = Assert.assertAtomic' ("Channel.recv(3.1.1)", SOME 1)
231 val msg =
232 S.switch
233 (fn rt =>
234 (prio := 1
235 ; TransID.force stxid
236 ; S.prepVal (st, rt)))
237 val () = debug' "Channel.recv(3.1.2)" (* NonAtomic *)
238 val () = Assert.assertNonAtomic' "Channel.recv(3.1.1)"
239 in
240 msg
241 end
242 | NONE =>
243 let
244 val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *)
245 val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
246 val msg =
247 S.atomicSwitchToNext
248 (fn rt => enqueAndClean (inQ, (TransID.mkTxId (), rt)))
249 val () = debug' "Channel.recv(3.2.2)" (* Atomic 1 *)
250 val () = Assert.assertAtomic' ("Channel.recv(3.2.2)", SOME 1)
251 val () = S.atomicEnd ()
252 val () = debug' "Channel.recv(3.2.3)" (* NonAtomic *)
253 val () = Assert.assertNonAtomic' "Channel.recv(3.2.3)"
254 in
255 msg
256 end
257 val () = debug' "Channel.recv(4)" (* NonAtomic *)
258 val () = Assert.assertNonAtomic' "Channel.recv(4)"
259 in
260 msg
261 end
262
263 fun recvEvt (CHAN {prio, inQ, outQ}) =
264 let
265 fun doitFn () =
266 let
267 val () = Assert.assertAtomic' ("Channel.recvEvt.doitFn", NONE)
268 val (stxid, st) = valOf (Q.deque outQ)
269 val () = debug' "Channel.recvEvt(3.1.1)" (* Atomic 1 *)
270 val () = Assert.assertAtomic' ("Channel.recvEvt(3.1.1)", SOME 1)
271 val msg =
272 S.switch
273 (fn rt =>
274 (prio := 1
275 ; TransID.force stxid
276 ; S.prepVal (st, rt)))
277 val () = debug' "Channel.recvEvt(3.1.2)" (* NonAtomic *)
278 val () = Assert.assertNonAtomic' "Channel.recvEvt(3.1.1)"
279 in
280 msg
281 end
282 fun blockFn {transId, cleanUp, next} =
283 let
284 val () = Assert.assertAtomic' ("Channel.recvEvt.blockFn", NONE)
285 val () = debug' "Channel.recvEvt(3.2.1)" (* Atomic 1 *)
286 val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.1)", SOME 1)
287 val msg =
288 S.atomicSwitch
289 (fn rt =>
290 (enqueAndClean (inQ, (transId, rt))
291 ; next ()))
292 val () = debug' "Channel.recvEvt(3.2.2)" (* Atomic 1 *)
293 val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.2)", SOME 1)
294 val () = cleanUp ()
295 val () = S.atomicEnd ()
296 val () = debug' "Channel.recvEvt(3.2.3)" (* NonAtomic *)
297 val () = Assert.assertNonAtomic' "Channel.recvEvt(3.2.3)"
298 in
299 msg
300 end
301 fun pollFn () =
302 let
303 val () = Assert.assertAtomic' ("Channel.recvEvt.pollFn", NONE)
304 val () = debug' "Channel.recvEvt(2)" (* Atomic 1 *)
305 val () = Assert.assertAtomic' ("Channel.recvEvt(2)", SOME 1)
306 in
307 case cleanAndChk (prio, outQ) of
308 0 => E.blocked blockFn
309 | prio => E.enabled {prio = prio, doitFn = doitFn}
310 end
311 in
312 E.bevt pollFn
313 end
314
315 fun recvPoll (CHAN {prio, outQ, ...}) =
316 let
317 val () = Assert.assertNonAtomic' "Channel.recvPoll"
318 val () = debug' "Channel.recvPoll(1)" (* NonAtomic *)
319 val () = Assert.assertNonAtomic' "Channel.recvPoll(1)"
320 val () = S.atomicBegin ()
321 val () = debug' "Channel.recvPoll(2)" (* Atomic 1 *)
322 val () = Assert.assertAtomic' ("Channel.recvPoll(2)", SOME 1)
323 val msg =
324 case cleanAndDeque outQ of
325 SOME (stxid, st) =>
326 let
327 val () = debug' "Channel.recvPoll(3.1.1)" (* Atomic 1 *)
328 val () = Assert.assertAtomic' ("Channel.recvPoll(3.1.1)", SOME 1)
329 val msg =
330 S.switch
331 (fn rt =>
332 (prio := 1
333 ; TransID.force stxid
334 ; S.prepVal (st, rt)))
335 val msg = SOME msg
336 val () = debug' "Channel.recvPoll(3.1.2)" (* NonAtomic *)
337 val () = Assert.assertNonAtomic' "Channel.recvPoll(3.1.1)"
338 in
339 msg
340 end
341 | NONE =>
342 let
343 val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *)
344 val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
345 val msg = NONE
346 val () = debug' "Channel.recvPoll(3.2.2)" (* Atomic 1 *)
347 val () = Assert.assertAtomic' ("Channel.recvPoll(3.2.2)", SOME 1)
348 val () = S.atomicEnd ()
349 val () = debug' "Channel.recvPoll(3.2.3)" (* NonAtomic *)
350 val () = Assert.assertNonAtomic' "Channel.recvPoll(3.2.3)"
351 in
352 msg
353 end
354 val () = debug' "Channel.recvPoll(4)" (* NonAtomic *)
355 val () = Assert.assertNonAtomic' "Channel.recvPoll(4)"
356 in
357 msg
358 end
359 end