2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1995 AT
&T Bell Laboratories
.
9 * COPYRIGHT (c
) 1989-1991 John H
. Reppy
11 * The implementation
of Id
-style synchronizing memory cells
.
14 structure SyncVar
: SYNC_VAR_EXTRA
=
16 structure Assert
= LocalAssert(val assert
= false)
17 structure Debug
= LocalDebug(val debug
= false)
19 structure Q
= ImpQueue
20 structure S
= Scheduler
22 fun debug msg
= Debug
.sayDebug ([S
.atomicMsg
, S
.tidMsg
], msg
)
24 datatype trans_id
= datatype TransID
.trans_id
25 datatype trans_id_state
= datatype TransID
.trans_id_state
28 (* the underlying representation
of both ivars
and mvars is the same
. *)
30 CELL
of {prio
: int ref
,
31 readQ
: (trans_id
* 'a S
.thread
) Q
.t
,
32 value
: 'a option ref
}
34 type 'a ivar
= 'a cell
35 type 'a mvar
= 'a cell
39 fun newCell () = CELL
{prio
= ref
0, readQ
= Q
.new(), value
= ref NONE
}
41 (* sameCell
: ('a cell
* 'a cell
) -> bool *)
42 fun sameCell (CELL
{prio
= prio1
, ...}, CELL
{prio
= prio2
, ...}) =
45 (* bump a priority value by one
, returning the old value
*)
46 fun bumpPriority (p
as ref n
) = (p
:= n
+1; n
)
48 (* functions to clean channel input
and output queues
*)
50 fun cleaner (TXID txst
, _
) =
51 case !txst
of CANCEL
=> true | _
=> false
54 Q
.cleanAndDeque (q
, cleaner
)
55 fun enqueAndClean (q
, item
) =
56 Q
.enqueAndClean (q
, item
, cleaner
)
59 (* When a thread is resumed after being blocked on an iGet or mGet operation
,
60 * there may be other threads also blocked on the variable
. This function
61 * is used to propagate the message to all
of the threads that are blocked
62 * on the
variable (or until one
of them takes the value
in the mvar
case).
63 * It must be called from an atomic region
; when the readQ is finally empty
,
64 * we leave the atomic region
. We must use
"cleanAndDeque" to get items
65 * from the readQ
in the unlikely event that a single thread executes a
66 * choice
of multiple gets on the same variable
.
68 fun relayMsg (readQ
, msg
) =
69 case (cleanAndDeque readQ
) of
75 ; S
.prepVal (t
, msg
)))
78 (* Generalized synchronized variables
,
79 * to factor out the common operations
.
82 fun gPut (name
, CELL
{prio
, readQ
, value
}, x
) =
84 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
])
85 val () = debug (fn () => concat
[name
, "(1)"]) (* NonAtomic
*)
86 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(1)"])
87 val () = S
.atomicBegin()
88 val () = debug (fn () => concat
[name
, "(2)"]) (* Atomic
1 *)
89 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(2)"], SOME
1)
94 val () = debug (fn () => concat
[name
, "(3.1.1)"]) (* Atomic
1 *)
95 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.1.1)"], SOME
1)
96 val () = value
:= SOME x
98 case cleanAndDeque readQ
of
99 NONE
=> S
.atomicEnd ()
100 |
SOME (rtxid
, rt
) =>
104 ; TransID
.force rtxid
105 ; S
.prepVal (rt
, x
)))
106 val () = debug (fn () => concat
[name
, "(3.1.2)"]) (* NonAtomic
*)
107 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.1.2)"])
113 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
114 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
115 val () = S
.atomicEnd ()
116 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* NonAtomic
*)
117 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"])
121 val () = debug (fn () => concat
[name
, "(4)"]) (* NonAtomic
*)
122 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(4)"])
127 (* Swap the current contents
of the cell
with a new value
;
128 * it is guaranteed to be atomic
.
130 fun gSwap (name
, doSwap
, CELL
{prio
, readQ
, value
}) =
132 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, ""])
133 val () = debug (fn () => concat
[name
, "(1)"]) (* NonAtomic
*)
134 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(1)"])
135 val () = S
.atomicBegin()
136 val () = debug (fn () => concat
[name
, "(2)"]) (* Atomic
1 *)
137 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(2)"], SOME
1)
142 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
143 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
146 (fn rt
=> enqueAndClean (readQ
, (TransID
.mkTxId (), rt
)))
147 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* Atomic
1 *)
148 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"], SOME
1)
149 val () = doSwap value
150 val () = relayMsg (readQ
, msg
)
151 val () = debug (fn () => concat
[name
, "(3.2.3)"]) (* NonAtomic
*)
152 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.3)"])
158 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
159 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
161 val () = doSwap value
162 val () = S
.atomicEnd ()
163 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* NonAtomic
*)
164 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"])
168 val () = debug (fn () => concat
[name
, "(4)"]) (* NonAtomic
*)
169 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(4)"])
174 fun gSwapEvt (name
, doSwap
, CELL
{prio
, readQ
, value
}) =
178 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, ".doitFn"], NONE
)
179 val x
= valOf (!value
)
180 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
181 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
183 val () = doSwap value
184 val () = S
.atomicEnd ()
185 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* NonAtomic
*)
186 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"])
190 fun blockFn
{transId
, cleanUp
, next
} =
192 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, ".blockFn"], NONE
)
193 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
194 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
198 (enqueAndClean (readQ
, (transId
, rt
))
200 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* Atomic
1 *)
201 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"], SOME
1)
203 val () = doSwap value
204 val () = relayMsg (readQ
, msg
)
205 val () = debug (fn () => concat
[name
, "(3.2.3)"]) (* NonAtomic
*)
206 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.3)"])
212 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, ".pollFn"], NONE
)
213 val () = debug (fn () => concat
[name
, "(2)"]) (* Atomic
1 *)
214 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(2)"], SOME
1)
217 NONE
=> E
.blocked blockFn
218 | SOME _
=> E
.enabled
{prio
= bumpPriority prio
,
225 fun gSwapPoll (name
, doSwap
, CELL
{prio
, value
, ...}) =
227 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, ""])
228 val () = debug (fn () => concat
[name
, "(1)"]) (* NonAtomic
*)
229 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(1)"])
230 val () = S
.atomicBegin()
231 val () = debug (fn () => concat
[name
, "(2)"]) (* Atomic
1 *)
232 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(2)"], SOME
1)
237 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
238 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
240 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* Atomic
1 *)
241 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"], SOME
1)
242 val () = S
.atomicEnd ()
243 val () = debug (fn () => concat
[name
, "(3.2.3)"]) (* NonAtomic
*)
244 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.3)"])
250 val () = debug (fn () => concat
[name
, "(3.2.1)"]) (* Atomic
1 *)
251 val () = Assert
.assertAtomic (fn () => concat
["SyncVar.", name
, "(3.2.1)"], SOME
1)
253 val () = doSwap value
254 val () = S
.atomicEnd ()
255 val () = debug (fn () => concat
[name
, "(3.2.2)"]) (* NonAtomic
*)
256 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(3.2.2)"])
260 val () = debug (fn () => concat
[name
, "(4)"]) (* NonAtomic
*)
261 val () = Assert
.assertNonAtomic (fn () => concat
["SyncVar.", name
, "(4)"])
270 val sameIVar
= sameCell
272 fun iPut (cell
, x
) = gPut ("iPut", cell
, x
)
273 local fun doGetSwap _
= ()
275 fun iGet cell
= gSwap ("iGet", doGetSwap
, cell
)
276 fun iGetEvt cell
= gSwapEvt ("iGetEvt", doGetSwap
, cell
)
277 fun iGetPoll cell
= gSwapPoll ("iGetPoll", doGetSwap
, cell
)
283 fun mVarInit x
= CELL
{prio
= ref
0, readQ
= Q
.new(), value
= ref (SOME x
)}
284 val sameMVar
= sameCell
286 fun mPut (cell
, x
) = gPut ("mPut", cell
, x
)
287 local fun doTakeSwap value
= value
:= NONE
289 fun mTake cell
= gSwap ("mTake", doTakeSwap
, cell
)
290 fun mTakeEvt cell
= gSwapEvt ("mTakeEvt", doTakeSwap
, cell
)
291 fun mTakePoll cell
= gSwapPoll ("mTakePoll", doTakeSwap
, cell
)
293 local fun doGetSwap _
= ()
295 fun mGet cell
= gSwap ("mGet", doGetSwap
, cell
)
296 fun mGetEvt cell
= gSwapEvt ("mGetEvt", doGetSwap
, cell
)
297 fun mGetPoll cell
= gSwapPoll ("mGetPoll", doGetSwap
, cell
)
299 local fun doSwapSwap x value
= value
:= SOME x
301 fun mSwap (cell
, x
) = gSwap ("mSwap", doSwapSwap x
, cell
)
302 fun mSwapEvt (cell
, x
) = gSwapEvt ("mSwap", doSwapSwap x
, cell
)