| 1 | (* channel.sig |
| 2 | * 2004 Matthew Fluet (mfluet@acm.org) |
| 3 | * Ported to MLton threads. |
| 4 | *) |
| 5 | |
| 6 | (* channel.sml |
| 7 | * |
| 8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. |
| 9 | * COPYRIGHT (c) 1989-1991 John H. Reppy |
| 10 | * |
| 11 | * The representation of synchronous channels. |
| 12 | * |
| 13 | * To ensure that we always leave the atomic region exactly once, we |
| 14 | * require that the blocking operation be responsible for leaving the |
| 15 | * atomic region (in the event case, it must also execute the clean-up |
| 16 | * action). The doitFn always transfers control to the blocked thread |
| 17 | * without leaving the atomic region. Note that the send (and sendEvt) |
| 18 | * blockFns run using the receiver's thread ID. |
| 19 | *) |
| 20 | |
| 21 | structure Channel : CHANNEL_EXTRA = |
| 22 | struct |
| 23 | structure Assert = LocalAssert(val assert = false) |
| 24 | structure Debug = LocalDebug(val debug = false) |
| 25 | |
| 26 | structure Q = ImpQueue |
| 27 | structure S = Scheduler |
| 28 | structure E = Event |
| 29 | fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg) |
| 30 | fun debug' msg = debug (fn () => msg) |
| 31 | |
| 32 | datatype trans_id = datatype TransID.trans_id |
| 33 | datatype trans_id_state = datatype TransID.trans_id_state |
| 34 | |
| 35 | |
| 36 | datatype 'a chan = |
| 37 | CHAN of {prio : int ref, |
| 38 | inQ : (trans_id * 'a S.thread) Q.t, |
| 39 | outQ : (trans_id * 'a S.thread S.thread) Q.t} |
| 40 | |
| 41 | (* |
| 42 | fun resetChan (CHAN {prio, inQ, outQ}) = |
| 43 | (prio := 1 |
| 44 | ; Q.reset inQ |
| 45 | ; Q.reset outQ) |
| 46 | *) |
| 47 | |
| 48 | fun channel () = CHAN {prio = ref 1, inQ = Q.new (), outQ = Q.new ()} |
| 49 | |
| 50 | (* sameChannel : ('a chan * 'a chan) -> bool *) |
| 51 | fun sameChannel (CHAN {prio = prio1, ...}, CHAN {prio = prio2, ...}) = |
| 52 | prio1 = prio2 |
| 53 | |
| 54 | |
| 55 | (* bump a priority value by one, returning the old value *) |
| 56 | fun bumpPriority (p as ref n) = (p := n+1; n) |
| 57 | |
| 58 | (* functions to clean channel input and output queues *) |
| 59 | local |
| 60 | fun cleaner (TXID txst, _) = |
| 61 | case !txst of CANCEL => true | _ => false |
| 62 | in |
| 63 | fun cleanAndChk (prio, q) : int = |
| 64 | (Q.clean (q, cleaner) |
| 65 | ; if Q.empty q |
| 66 | then 0 |
| 67 | else bumpPriority prio) |
| 68 | fun cleanAndDeque q = |
| 69 | Q.cleanAndDeque (q, cleaner) |
| 70 | fun enqueAndClean (q, item) = |
| 71 | Q.enqueAndClean (q, item, cleaner) |
| 72 | end |
| 73 | |
| 74 | fun send (CHAN {prio, inQ, outQ}, msg) = |
| 75 | let |
| 76 | val () = Assert.assertNonAtomic' "Channel.send" |
| 77 | val () = debug' "Chennel.send(1)" (* NonAtomic *) |
| 78 | val () = Assert.assertNonAtomic' "Channel.send(1)" |
| 79 | val () = S.atomicBegin () |
| 80 | val () = debug' "Channel.send(2)" (* Atomic 1 *) |
| 81 | val () = Assert.assertAtomic' ("Channel.send(2)", SOME 1) |
| 82 | val () = |
| 83 | case cleanAndDeque inQ of |
| 84 | SOME (rtxid, rt) => |
| 85 | let |
| 86 | val () = debug' "Channel.send(3.1.1)" (* Atomic 1 *) |
| 87 | val () = Assert.assertAtomic' ("Channel.send(3.1.1)", SOME 1) |
| 88 | val () = |
| 89 | S.readyAndSwitch |
| 90 | (fn () => |
| 91 | (prio := 1 |
| 92 | ; TransID.force rtxid |
| 93 | ; S.prepVal (rt, msg))) |
| 94 | val () = debug' "Channel.send(3.1.2)" (* NonAtomic *) |
| 95 | val () = Assert.assertNonAtomic' "Channel.send(3.1.2)" |
| 96 | in |
| 97 | () |
| 98 | end |
| 99 | | NONE => |
| 100 | let |
| 101 | val () = debug' "Channel.send(3.2.1)" (* Atomic 1 *) |
| 102 | val () = Assert.assertAtomic' ("Channel.send(3.2.1)", SOME 1) |
| 103 | val rt = |
| 104 | S.atomicSwitchToNext |
| 105 | (fn st => Q.enque (outQ, (TransID.mkTxId (), st))) |
| 106 | val () = debug' "Channel.send(3.2.2)" (* Atomic 1 *) |
| 107 | val () = Assert.assertAtomic' ("Channel.send(3.2.2)", SOME 1) |
| 108 | val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg)) |
| 109 | val () = debug' "Chanell.send(3.2.3)" (* NonAtomic *) |
| 110 | val () = Assert.assertNonAtomic' "Channel.send(3.2.2)" |
| 111 | in |
| 112 | () |
| 113 | end |
| 114 | val () = debug' "Channel.send(4)" (* NonAtomic *) |
| 115 | val () = Assert.assertNonAtomic' "Channel.send(4)" |
| 116 | in |
| 117 | () |
| 118 | end |
| 119 | |
| 120 | fun sendEvt (CHAN {prio, inQ, outQ}, msg) = |
| 121 | let |
| 122 | fun doitFn () = |
| 123 | let |
| 124 | val () = Assert.assertAtomic' ("Channel.sendEvt.doitFn", NONE) |
| 125 | val (rtxid, rt) = valOf (Q.deque inQ) |
| 126 | val () = debug' "Channel.sendEvt(3.1.1)" (* Atomic 1 *) |
| 127 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.1.1)", SOME 1) |
| 128 | val () = |
| 129 | S.readyAndSwitch |
| 130 | (fn () => |
| 131 | (prio := 1 |
| 132 | ; TransID.force rtxid |
| 133 | ; S.prepVal (rt, msg))) |
| 134 | val () = debug' "Channel.sendEvt(3.1.2)" (* NonAtomic *) |
| 135 | val () = Assert.assertNonAtomic' "Channel.sendEvt(3.1.2)" |
| 136 | in |
| 137 | () |
| 138 | end |
| 139 | fun blockFn {transId, cleanUp, next} = |
| 140 | let |
| 141 | val () = Assert.assertAtomic' ("Channel.sendEvt.blockFn", NONE) |
| 142 | val () = debug' "Channel.sendEvt(3.2.1)" (* Atomic 1 *) |
| 143 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.1)", SOME 1) |
| 144 | val rt = |
| 145 | S.atomicSwitch |
| 146 | (fn st => |
| 147 | (enqueAndClean (outQ, (transId, st)) |
| 148 | ; next ())) |
| 149 | val () = debug' "Channel.sendEvt(3.2.2)" (* Atomic 1 *) |
| 150 | val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.2)", SOME 1) |
| 151 | val () = cleanUp () |
| 152 | val () = S.atomicReadyAndSwitch (fn () => S.prepVal (rt, msg)) |
| 153 | val () = debug' "Channel.sendEvt(3.2.3)" (* NonAtomic *) |
| 154 | val () = Assert.assertNonAtomic' "Channel.sendEvt(3.2.2)" |
| 155 | in |
| 156 | () |
| 157 | end |
| 158 | fun pollFn () = |
| 159 | let |
| 160 | val () = Assert.assertAtomic' ("Channel.sendEvt.pollFn", NONE) |
| 161 | val () = debug' "Channel.sendEvt(2)" (* Atomic 1 *) |
| 162 | val () = Assert.assertAtomic' ("Channel.sendEvt(2)", SOME 1) |
| 163 | in |
| 164 | case cleanAndChk (prio, inQ) of |
| 165 | 0 => E.blocked blockFn |
| 166 | | prio => E.enabled {prio = prio, doitFn = doitFn} |
| 167 | end |
| 168 | in |
| 169 | E.bevt pollFn |
| 170 | end |
| 171 | |
| 172 | fun sendPoll (CHAN {prio, inQ, ...}, msg) = |
| 173 | let |
| 174 | val () = Assert.assertNonAtomic' "Channel.sendPoll" |
| 175 | val () = debug' "Channel.sendPoll(1)" (* NonAtomic *) |
| 176 | val () = Assert.assertNonAtomic' "Channel.sendPoll(1)" |
| 177 | val () = S.atomicBegin () |
| 178 | val () = debug' "Channel.sendPoll(2)" (* Atomic 1 *) |
| 179 | val () = Assert.assertAtomic' ("Channel.sendPoll(1)", SOME 1) |
| 180 | val b = |
| 181 | case cleanAndDeque inQ of |
| 182 | SOME (rtxid, rt) => |
| 183 | let |
| 184 | val () = debug' "Channel.sendPoll(3.1.1)" (* Atomic 1 *) |
| 185 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.1.1)", SOME 1) |
| 186 | val () = |
| 187 | S.readyAndSwitch |
| 188 | (fn () => |
| 189 | (prio := 1 |
| 190 | ; TransID.force rtxid |
| 191 | ; S.prepVal (rt, msg))) |
| 192 | val b = true |
| 193 | val () = debug' "Channel.sendPoll(3.1.2)" (* NonAtomic *) |
| 194 | val () = Assert.assertNonAtomic' "Channel.sendPoll(3.1.2)" |
| 195 | in |
| 196 | b |
| 197 | end |
| 198 | | NONE => |
| 199 | let |
| 200 | val () = debug' "Channel.sendPoll(3.2.1)" (* Atomic 1 *) |
| 201 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.1)", SOME 1) |
| 202 | val b = false |
| 203 | val () = debug' "Channel.sendPoll(3.2.2)" (* Atomic 1 *) |
| 204 | val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.2)", SOME 1) |
| 205 | val () = S.atomicEnd () |
| 206 | val () = debug' "Channel.sendPoll(3.2.3)" (* NonAtomic *) |
| 207 | val () = Assert.assertNonAtomic' "Channel.sendPoll(3.2.2)" |
| 208 | in |
| 209 | b |
| 210 | end |
| 211 | val () = debug' "Channel.sendPoll(4)" (* NonAtomic *) |
| 212 | val () = Assert.assertNonAtomic' "Channel.sendPoll(4)" |
| 213 | in |
| 214 | b |
| 215 | end |
| 216 | |
| 217 | fun recv (CHAN {prio, inQ, outQ}) = |
| 218 | let |
| 219 | val () = Assert.assertNonAtomic' "Channel.recv" |
| 220 | val () = debug' "Channel.recv(1)" (* NonAtomic *) |
| 221 | val () = Assert.assertNonAtomic' "Channel.recv(1)" |
| 222 | val () = S.atomicBegin () |
| 223 | val () = debug' "Channel.recv(2)" (* Atomic 1 *) |
| 224 | val () = Assert.assertAtomic' ("Channel.recv(2)", SOME 1) |
| 225 | val msg = |
| 226 | case cleanAndDeque outQ of |
| 227 | SOME (stxid, st) => |
| 228 | let |
| 229 | val () = debug' "Channel.recv(3.1.1)" (* Atomic 1 *) |
| 230 | val () = Assert.assertAtomic' ("Channel.recv(3.1.1)", SOME 1) |
| 231 | val msg = |
| 232 | S.switch |
| 233 | (fn rt => |
| 234 | (prio := 1 |
| 235 | ; TransID.force stxid |
| 236 | ; S.prepVal (st, rt))) |
| 237 | val () = debug' "Channel.recv(3.1.2)" (* NonAtomic *) |
| 238 | val () = Assert.assertNonAtomic' "Channel.recv(3.1.1)" |
| 239 | in |
| 240 | msg |
| 241 | end |
| 242 | | NONE => |
| 243 | let |
| 244 | val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *) |
| 245 | val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1) |
| 246 | val msg = |
| 247 | S.atomicSwitchToNext |
| 248 | (fn rt => enqueAndClean (inQ, (TransID.mkTxId (), rt))) |
| 249 | val () = debug' "Channel.recv(3.2.2)" (* Atomic 1 *) |
| 250 | val () = Assert.assertAtomic' ("Channel.recv(3.2.2)", SOME 1) |
| 251 | val () = S.atomicEnd () |
| 252 | val () = debug' "Channel.recv(3.2.3)" (* NonAtomic *) |
| 253 | val () = Assert.assertNonAtomic' "Channel.recv(3.2.3)" |
| 254 | in |
| 255 | msg |
| 256 | end |
| 257 | val () = debug' "Channel.recv(4)" (* NonAtomic *) |
| 258 | val () = Assert.assertNonAtomic' "Channel.recv(4)" |
| 259 | in |
| 260 | msg |
| 261 | end |
| 262 | |
| 263 | fun recvEvt (CHAN {prio, inQ, outQ}) = |
| 264 | let |
| 265 | fun doitFn () = |
| 266 | let |
| 267 | val () = Assert.assertAtomic' ("Channel.recvEvt.doitFn", NONE) |
| 268 | val (stxid, st) = valOf (Q.deque outQ) |
| 269 | val () = debug' "Channel.recvEvt(3.1.1)" (* Atomic 1 *) |
| 270 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.1.1)", SOME 1) |
| 271 | val msg = |
| 272 | S.switch |
| 273 | (fn rt => |
| 274 | (prio := 1 |
| 275 | ; TransID.force stxid |
| 276 | ; S.prepVal (st, rt))) |
| 277 | val () = debug' "Channel.recvEvt(3.1.2)" (* NonAtomic *) |
| 278 | val () = Assert.assertNonAtomic' "Channel.recvEvt(3.1.1)" |
| 279 | in |
| 280 | msg |
| 281 | end |
| 282 | fun blockFn {transId, cleanUp, next} = |
| 283 | let |
| 284 | val () = Assert.assertAtomic' ("Channel.recvEvt.blockFn", NONE) |
| 285 | val () = debug' "Channel.recvEvt(3.2.1)" (* Atomic 1 *) |
| 286 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.1)", SOME 1) |
| 287 | val msg = |
| 288 | S.atomicSwitch |
| 289 | (fn rt => |
| 290 | (enqueAndClean (inQ, (transId, rt)) |
| 291 | ; next ())) |
| 292 | val () = debug' "Channel.recvEvt(3.2.2)" (* Atomic 1 *) |
| 293 | val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.2)", SOME 1) |
| 294 | val () = cleanUp () |
| 295 | val () = S.atomicEnd () |
| 296 | val () = debug' "Channel.recvEvt(3.2.3)" (* NonAtomic *) |
| 297 | val () = Assert.assertNonAtomic' "Channel.recvEvt(3.2.3)" |
| 298 | in |
| 299 | msg |
| 300 | end |
| 301 | fun pollFn () = |
| 302 | let |
| 303 | val () = Assert.assertAtomic' ("Channel.recvEvt.pollFn", NONE) |
| 304 | val () = debug' "Channel.recvEvt(2)" (* Atomic 1 *) |
| 305 | val () = Assert.assertAtomic' ("Channel.recvEvt(2)", SOME 1) |
| 306 | in |
| 307 | case cleanAndChk (prio, outQ) of |
| 308 | 0 => E.blocked blockFn |
| 309 | | prio => E.enabled {prio = prio, doitFn = doitFn} |
| 310 | end |
| 311 | in |
| 312 | E.bevt pollFn |
| 313 | end |
| 314 | |
| 315 | fun recvPoll (CHAN {prio, outQ, ...}) = |
| 316 | let |
| 317 | val () = Assert.assertNonAtomic' "Channel.recvPoll" |
| 318 | val () = debug' "Channel.recvPoll(1)" (* NonAtomic *) |
| 319 | val () = Assert.assertNonAtomic' "Channel.recvPoll(1)" |
| 320 | val () = S.atomicBegin () |
| 321 | val () = debug' "Channel.recvPoll(2)" (* Atomic 1 *) |
| 322 | val () = Assert.assertAtomic' ("Channel.recvPoll(2)", SOME 1) |
| 323 | val msg = |
| 324 | case cleanAndDeque outQ of |
| 325 | SOME (stxid, st) => |
| 326 | let |
| 327 | val () = debug' "Channel.recvPoll(3.1.1)" (* Atomic 1 *) |
| 328 | val () = Assert.assertAtomic' ("Channel.recvPoll(3.1.1)", SOME 1) |
| 329 | val msg = |
| 330 | S.switch |
| 331 | (fn rt => |
| 332 | (prio := 1 |
| 333 | ; TransID.force stxid |
| 334 | ; S.prepVal (st, rt))) |
| 335 | val msg = SOME msg |
| 336 | val () = debug' "Channel.recvPoll(3.1.2)" (* NonAtomic *) |
| 337 | val () = Assert.assertNonAtomic' "Channel.recvPoll(3.1.1)" |
| 338 | in |
| 339 | msg |
| 340 | end |
| 341 | | NONE => |
| 342 | let |
| 343 | val () = debug' "Channel.recv(3.2.1)" (* Atomic 1 *) |
| 344 | val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1) |
| 345 | val msg = NONE |
| 346 | val () = debug' "Channel.recvPoll(3.2.2)" (* Atomic 1 *) |
| 347 | val () = Assert.assertAtomic' ("Channel.recvPoll(3.2.2)", SOME 1) |
| 348 | val () = S.atomicEnd () |
| 349 | val () = debug' "Channel.recvPoll(3.2.3)" (* NonAtomic *) |
| 350 | val () = Assert.assertNonAtomic' "Channel.recvPoll(3.2.3)" |
| 351 | in |
| 352 | msg |
| 353 | end |
| 354 | val () = debug' "Channel.recvPoll(4)" (* NonAtomic *) |
| 355 | val () = Assert.assertNonAtomic' "Channel.recvPoll(4)" |
| 356 | in |
| 357 | msg |
| 358 | end |
| 359 | end |