Import Upstream version 20180207
[hcoop/debian/mlton.git] / lib / cml / core-cml / mailbox.sml
1 (* mailbox.sml
2 * 2004 Matthew Fluet (mfluet@acm.org)
3 * Ported to MLton threads.
4 *)
5
6 (* mailbox.sml
7 *
8 * COPYRIGHT (c) 1995 AT&T Bell Laboratories
9 * COPYRIGHT (c) 1989-1991 John H. Reppy
10 *
11 * Asynchronous channels (called mailboxes).
12 *)
13
14 structure Mailbox : MAILBOX_EXTRA =
15 struct
16 structure Assert = LocalAssert(val assert = false)
17 structure Debug = LocalDebug(val debug = false)
18
19 structure Q = FunQueue
20 structure S = Scheduler
21 structure E = Event
22 fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
23 fun debug' msg = debug (fn () => msg)
24
25 datatype trans_id = datatype TransID.trans_id
26 datatype trans_id_state = datatype TransID.trans_id_state
27
28
29 (* the state of a mailbox. The queue of the NONEMPTY constructor should
30 * never be empty (use EMPTY instead).
31 *)
32 datatype 'a state =
33 EMPTY of (TransID.trans_id * 'a S.thread) Q.t
34 | NONEMPTY of (int * 'a Q.t)
35
36 datatype 'a mbox = MB of 'a state ref
37
38 (*
39 fun resetMbox (MB state) = state := EMPTY (Q.new ())
40 *)
41
42 fun mailbox () = MB (ref (EMPTY (Q.new ())))
43
44 fun sameMailbox (MB s1, MB s2) = (s1 = s2)
45
46 local
47 fun cleaner (TXID txst, _) =
48 case !txst of CANCEL => true | _ => false
49 in
50 fun cleanAndDeque q =
51 Q.cleanAndDeque (q, cleaner)
52 end
53
54 fun send (MB state, x) =
55 let
56 val () = Assert.assertNonAtomic' "Mailbox.send"
57 val () = debug' "Mailbox.send(1)" (* NonAtomic *)
58 val () = Assert.assertNonAtomic' "Mailbox.send(1)"
59 val () = S.atomicBegin ()
60 val () = debug' "Mailbox.send(2)" (* Atomic 1 *)
61 val () = Assert.assertAtomic' ("Mailbox.send(2)", SOME 1)
62 val () =
63 case !state of
64 EMPTY q =>
65 let
66 val () = debug' "Mailbox.send(3.1.1)" (* Atomic 1 *)
67 val () = Assert.assertAtomic' ("Mailbox.send(3.1.1)", SOME 1)
68 in
69 case (cleanAndDeque q) of
70 (NONE, _) =>
71 (let val q = Q.new ()
72 in state := NONEMPTY (1, Q.enque (q, x))
73 end
74 ; S.atomicEnd())
75 | (SOME (transId', t'), q') =>
76 S.readyAndSwitch
77 (fn () =>
78 (state := EMPTY q'
79 ; TransID.force transId'
80 ; S.prepVal (t', x)))
81 end
82 | NONEMPTY (p, q) =>
83 (* we force a context switch here to prevent
84 * a producer from outrunning a consumer.
85 *)
86 S.atomicReadyAndSwitchToNext
87 (fn () => state := NONEMPTY (p, Q.enque (q, x)))
88 val () = debug' "Mailbox.send(4)" (* NonAtomic *)
89 val () = Assert.assertNonAtomic' "Channel.send(4)"
90 in
91 ()
92 end
93
94 fun getMsg (state, q) =
95 let
96 val (msg, q') =
97 case Q.deque q of
98 SOME (msg, q') => (msg, q')
99 | NONE => raise Fail "Mailbox:getMsg"
100 val () = if Q.empty q'
101 then state := EMPTY (Q.new ())
102 else state := NONEMPTY (1, q')
103 val () = S.atomicEnd ()
104 in
105 msg
106 end
107
108 fun recv (MB state) =
109 let
110 val () = Assert.assertNonAtomic' "Mailbox.recv"
111 val () = debug' "Mailbox.recv(1)" (* NonAtomic *)
112 val () = Assert.assertNonAtomic' "Mailbox.recv(1)"
113 val () = S.atomicBegin ()
114 val () = debug' "Mailbox.recv(2)" (* Atomic 1 *)
115 val () = Assert.assertAtomic' ("Mailbox.recv(2)", SOME 1)
116 val msg =
117 case !state of
118 EMPTY q =>
119 let
120 val msg =
121 S.atomicSwitchToNext
122 (fn t => state := EMPTY (Q.enque (q, (TransID.mkTxId (), t))))
123 in
124 S.atomicEnd()
125 ; msg
126 end
127 | NONEMPTY (_, q) => getMsg (state, q)
128 val () = debug' "Mailbox.recv(4)" (* NonAtomic *)
129 val () = Assert.assertNonAtomic' "Channel.recv(4)"
130 in
131 msg
132 end
133
134 fun recvEvt (MB state) =
135 let
136 fun blockFn {transId, cleanUp: unit -> unit, next} =
137 let
138 val q =
139 case !state of
140 EMPTY q => q
141 | _ => raise Fail "Mailbox:recvEvt:blockFn"
142 val msg =
143 S.atomicSwitch
144 (fn t => (state := EMPTY (Q.enque (q, (transId, t)))
145 ; next ()))
146 in
147 cleanUp()
148 ; S.atomicEnd()
149 ; msg
150 end
151 fun pollFn () =
152 case !state of
153 EMPTY _ => E.blocked blockFn
154 | NONEMPTY (prio, q) =>
155 (state := NONEMPTY (prio + 1, q)
156 ; E.enabled {prio = prio,
157 doitFn = fn () => getMsg (state, q)})
158 in
159 E.bevt pollFn
160 end
161
162 fun recvPoll (MB state) =
163 (S.atomicBegin()
164 ; case !state of
165 EMPTY _ => (S.atomicEnd(); NONE)
166 | NONEMPTY (_, q) => SOME (getMsg (state, q)))
167 end