2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1995 AT
&T Bell Laboratories
.
9 * COPYRIGHT (c
) 1989-1991 John H
. Reppy
11 * Events for synchronizing on timeouts
.
14 structure TimeOut
: TIME_OUT_EXTRA
=
16 structure Assert
= LocalAssert(val assert
= false)
17 structure Debug
= LocalDebug(val debug
= false)
19 structure S
= Scheduler
21 fun debug msg
= Debug
.sayDebug ([S
.atomicMsg
, S
.tidMsg
], msg
)
22 fun debug
' msg
= debug (fn () => msg
)
24 datatype trans_id
= datatype TransID
.trans_id
25 datatype trans_id_state
= datatype TransID
.trans_id_state
28 (* this holds an approximation
of the current time
of day
. It is
29 * cleared at each pre
-emption
, and initialized on
demand (by getTime
).
33 (* returns an approximation
of the current time
of day
34 * (this is at least
as accurate
as the time quantum
).
38 NONE
=> let val t
= Time
.now()
42 fun preemptTime () = clock
:= NONE
44 (* The queue
of threads waiting for timeouts
.
45 * It is sorted
in increasing order
of time value
.
47 structure TQ
= FunPriorityQueue(structure Key
= struct open Time
type t
= time
end)
48 type item
= trans_id
* (unit
-> unit
) * S
.rdy_thread
49 val timeQ
: item TQ
.t ref
= ref (TQ
.new ())
51 fun cleaner (readied
: unit
-> unit
) elt
=
54 val (TXID txst
, cleanUp
: unit
-> unit
, t
) = TQ
.Elt
.value elt
58 | _
=> if Time
.<=(TQ
.Elt
.key elt
, now
)
66 fun timeWait (time
, txid
, cleanUp
, t
) =
67 (Assert
.assertAtomic
' ("TimeOut.timeWait", NONE
)
68 ; timeQ
:= TQ
.enqueAndClean(!timeQ
, time
, (txid
, cleanUp
, t
), cleaner (fn () => ())))
70 (** NOTE
: unlike for most base events
, the block functions
of time
-out
71 ** events
do not have to exit the atomic region or execute the clean
-up
72 ** operation
. This is done when they are removed from the waiting queue
.
76 fun blockFn
{transId
, cleanUp
, next
} =
78 val () = Assert
.assertAtomic
' ("TimeOut.timeOutEvt.blockFn", NONE
)
79 val () = debug
' "timeOutEvt(3.2.1)" (* Atomic
1 *)
80 val () = Assert
.assertAtomic
' ("TimeOut.timeOutEvt(3.2.1)", SOME
1)
84 (timeWait (Time
.+(time
, getTime ()), transId
, cleanUp
, S
.prep t
)
86 val () = debug
' "timeOutEvt(3.2.3)" (* NonAtomic
*)
87 val () = Assert
.assertNonAtomic
' "TimeOut.timeOutEvt(3.2.3)"
93 val () = Assert
.assertAtomic
' ("TimeOut.timeOutEvt.pollFn", NONE
)
94 val () = debug
' "timeOutEvt(2)" (* Atomic
1 *)
95 val () = Assert
.assertAtomic
' ("TimeOut.timeOutEvt(2)", SOME
1)
97 if Time
.<=(time
, Time
.zeroTime
)
98 then E
.enabled
{prio
= ~
1, doitFn
= S
.atomicEnd
}
99 else E
.blocked blockFn
107 fun blockFn
{transId
, cleanUp
, next
} =
109 val () = Assert
.assertAtomic
' ("TimeOut.atTimeEvt.blockFn", NONE
)
110 val () = debug
' "atTimeEvt(3.2.1)" (* Atomic
1 *)
111 val () = Assert
.assertAtomic
' ("TimeOut.atTimeEvt(3.2.1)", SOME
1)
115 (timeWait (time
, transId
, cleanUp
, S
.prep t
)
117 val () = debug
' "atTimeEvt(3.2.3)" (* NonAtomic
*)
118 val () = Assert
.assertNonAtomic
' "TimeOut.atTimeEvt(3.2.3)"
124 val () = Assert
.assertAtomic
' ("TimeOut.atTimeEvt.pollFn", NONE
)
125 val () = debug
' "atTimeEvt(2)" (* Atomic
1 *)
126 val () = Assert
.assertAtomic
' ("TimeOut.atTimeEvt(2)", SOME
1)
128 if Time
.<=(time
, getTime())
129 then E
.enabled
{prio
= ~
1, doitFn
= S
.atomicEnd
}
130 else E
.blocked blockFn
136 (* reset various pieces
of state
*)
137 fun reset () = timeQ
:= TQ
.new ()
139 (* what to
do at a preemption
*)
140 fun preempt () : Time
.time option option
=
142 val () = Assert
.assertAtomic
' ("TimeOut.preempt", NONE
)
143 val () = debug
' "TimeOut.preempt" (* Atomic
1 *)
144 val () = Assert
.assertAtomic
' ("TimeOut.preempt", SOME
1)
145 val () = preemptTime ()
151 val readied
= ref
false
152 val timeQ
' = TQ
.clean (timeQ
', cleaner (fn () => readied
:= true))
153 val () = timeQ
:= timeQ
'
157 else case TQ
.peek timeQ
' of
159 | SOME elt
=> SOME(SOME(Time
.-(TQ
.Elt
.key elt
, getTime ())))