2 * 2004 Matthew
Fluet (mfluet@acm
.org
)
3 * Ported to MLton threads
.
8 * COPYRIGHT (c
) 1995 AT
&T Bell Laboratories
.
9 * COPYRIGHT (c
) 1989-1991 John H
. Reppy
11 * This module implements the scheduling queues
and preemption
15 structure Scheduler
: SCHEDULER
=
17 structure Assert
= LocalAssert(val assert
= false)
18 structure GlobalDebug
= Debug
19 structure Debug
= LocalDebug(val debug
= false)
23 structure Q
= ImpQueue
24 structure T
= MLton
.Thread
25 structure TID
= ThreadID
26 structure SH
= SchedulerHooks
28 type thread_id
= ThreadID
.thread_id
29 datatype thread
= datatype RepTypes
.thread
30 datatype rdy_thread
= datatype RepTypes
.rdy_thread
32 fun prep (THRD (tid
, t
)) = RTHRD (tid
, T
.prepare (t
, ()))
33 fun prepVal (THRD (tid
, t
), v
) = RTHRD (tid
, T
.prepare (t
, v
))
34 fun prepFn (THRD (tid
, t
), f
) = RTHRD (tid
, T
.prepare (T
.prepend (t
, f
), ()))
36 (* the dummy thread Id
; this is used when an ID is needed to get
39 val dummyTid
= TID
.bogus
"dummy"
40 (* the error thread
. This thread is used to trap attempts to run CML
41 * without proper
initialization (i
.e
., via RunCML
). This thread is
44 val errorTid
= TID
.bogus
"error"
45 fun errorThrd () : unit thread
=
46 THRD (errorTid
, T
.new (fn () =>
48 ([fn () => "CML"], fn () => "**** Use RunCML.doit to run CML ****")
49 ; raise Fail
"CML not initialized")))
52 val curTid
: thread_id ref
= ref dummyTid
54 fun getThreadId (THRD (tid
, _
)) = tid
55 fun getCurThreadId () =
61 fun setCurThreadId tid
=
63 val () = Assert
.assertAtomic
' ("Scheduler.setCurThreadId", NONE
)
68 fun tidMsg () = TID
.tidToString (getCurThreadId ())
69 fun debug msg
= Debug
.sayDebug ([atomicMsg
, tidMsg
], msg
)
70 fun debug
' msg
= debug (fn () => msg
)
72 (* The thread ready queues
:
73 * rdyQ1 is the primary queue
and rdyQ2 is the secondary queue
.
75 val rdyQ1
: rdy_thread Q
.t
= Q
.new ()
76 and rdyQ2
: rdy_thread Q
.t
= Q
.new ()
78 (* enqueue a thread
in the primary queue
*)
80 (Assert
.assertAtomic
' ("Scheduler.enque1", NONE
)
81 ; Q
.enque (rdyQ1
, thrd
))
82 (* enqueue a thread
in the secondary queue
*)
84 (Assert
.assertAtomic
' ("Scheduler.enque2", NONE
)
85 ; Q
.enque (rdyQ2
, thrd
))
86 (* dequeue a thread from the primary queue
*)
88 (Assert
.assertAtomic
' ("Scheduler.deque1", NONE
)
89 ; case Q
.deque rdyQ1
of
91 | SOME thrd
=> SOME thrd
)
92 (* dequeue a thread from the secondary queue
*)
94 (Assert
.assertAtomic
' ("Scheduler.deque2", NONE
)
95 ; case Q
.deque rdyQ2
of
97 | SOME thrd
=> SOME thrd
)
98 (* promote a thread from the secondary queue to the primary queue
*)
100 (Assert
.assertAtomic
' ("Scheduler.promote", NONE
)
103 | SOME thrd
=> enque1 thrd
)
107 val () = Assert
.assertAtomic
' ("Scheduler.next", NONE
)
110 NONE
=> !SH
.pauseHook ()
117 val () = Assert
.assertAtomic
' ("Scheduler.ready", NONE
)
123 fun atomicSwitchAux msg f
=
124 (Assert
.assertAtomic (fn () => "Scheduler." ^ msg
, NONE
)
125 ; T
.atomicSwitch (fn t
=>
127 val tid
= getCurThreadId ()
128 val () = TID
.mark tid
129 val RTHRD (tid
',t
') = f (THRD (tid
, t
))
130 val () = setCurThreadId tid
'
135 fun atomicSwitch (f
: 'a thread
-> rdy_thread
) =
136 atomicSwitchAux
"atomicSwitch" f
137 fun switch (f
: 'a thread
-> rdy_thread
) =
138 (atomicBegin (); atomicSwitch f
)
139 fun atomicSwitchToNext (f
: 'a thread
-> unit
) =
140 atomicSwitchAux
"atomicSwitchToNext" (fn thrd
=> (f thrd
; next ()))
141 fun switchToNext (f
: 'a thread
-> unit
) =
142 (atomicBegin (); atomicSwitchToNext f
)
143 fun atomicReadyAndSwitch (f
: unit
-> rdy_thread
) =
144 atomicSwitchAux
"atomicReadyAndSwitch" (fn thrd
=> (ready (prep thrd
); f ()))
145 fun readyAndSwitch (f
: unit
-> rdy_thread
) =
146 (atomicBegin (); atomicReadyAndSwitch f
)
147 fun atomicReadyAndSwitchToNext (f
: unit
-> unit
) =
148 atomicSwitchAux
"atomicReadyAndSwitchToNext" (fn thrd
=> (ready (prep thrd
); f (); next ()))
149 fun readyAndSwitchToNext (f
: unit
-> unit
) =
150 (atomicBegin (); atomicReadyAndSwitchToNext f
)
153 fun new (f
: thread_id
-> ('a
-> unit
)) : 'a thread
=
155 val () = Assert
.assertAtomic
' ("Scheduler.new", NONE
)
157 val t
= T
.new (f tid
)
162 fun prepend (thrd
: 'a thread
, f
: 'b
-> 'a
) : 'b thread
=
164 val () = Assert
.assertAtomic
' ("Scheduler.prepend", NONE
)
165 val THRD (tid
, t
) = thrd
166 val t
= T
.prepend (t
, f
)
171 fun unwrap (f
: rdy_thread
-> rdy_thread
) (t
: T
.Runnable
.t
) : T
.Runnable
.t
=
173 val () = Assert
.assertAtomic
' ("Scheduler.unwrap", NONE
)
174 val tid
= getCurThreadId ()
175 val RTHRD (tid
', t
') = f (RTHRD (tid
, t
))
176 val () = setCurThreadId tid
'
182 (* reset various pieces
of state
*)
185 ; setCurThreadId dummyTid
186 ; Q
.reset rdyQ1
; Q
.reset rdyQ2
187 ; if not running
then ready (prep (errorThrd ())) else ()
189 (* what to
do at a
preemption (with the current thread
) *)
190 fun preempt (thrd
as RTHRD (tid
, _
)) =
192 val () = Assert
.assertAtomic
' ("Scheduler.preempt", NONE
)
193 val () = debug
' "Scheduler.preempt" (* Atomic
1 *)
194 val () = Assert
.assertAtomic
' ("Scheduler.preempt", SOME
1)