2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1994 AT
&T Bell Laboratories
.
10 * Asynchronous
multicast (one
-to
-many
) channels
. This implementation
11 * is based on a condition variable implementation
of multicast channels
.
12 * See Chapter
5 of "Concurrent Programming in ML" for details
.
15 structure Multicast
: MULTICAST
=
18 structure SV
= SyncVar
20 type 'a event
= 'a CML
.event
25 datatype 'a mc_state
= MCState
of ('a
* 'a mc_state SV
.ivar
)
27 Port
of (('a
* 'a mc_state SV
.ivar
) CML
.chan
* 'a mc_state SV
.ivar SV
.mvar
)
29 MChan
of ('a request CML
.chan
* 'a port CML
.chan
)
33 val outCh
= CML
.channel()
34 val stateVar
= SV
.mVarInit cv
37 val (MCState(v
, nextCV
)) = SV
.iGet cv
39 CML
.send (outCh
, (v
, nextCV
))
42 val _
= CML
.spawn (fn () => tee cv
)
49 val reqCh
= CML
.channel()
50 and replyCh
= CML
.channel()
52 case (CML
.recv reqCh
) of
54 (CML
.send (replyCh
, mkPort cv
)
58 val nextCV
= SV
.iVar()
60 SV
.iPut (cv
, MCState(m
, nextCV
))
63 val _
= CML
.spawn (fn () => server (SV
.iVar()))
68 fun multicast (MChan(ch
, _
), m
) = CML
.send (ch
, Message m
)
70 fun port (MChan(reqCh
, replyCh
)) =
71 (CML
.send (reqCh
, NewPort
)
74 fun copy (Port(_
, stateV
)) = mkPort(SV
.mGet stateV
)
76 fun recvMsg
stateV (v
, nextCV
) =
77 let val _
= SV
.mSwap (stateV
, nextCV
)
81 fun recv (Port(ch
, stateV
)) = recvMsg
stateV (CML
.recv ch
)
82 fun recvEvt (Port(ch
, stateV
)) = CML
.wrap(CML
.recvEvt ch
, recvMsg stateV
)