Commit | Line | Data |
---|---|---|
7f918cf1 CE |
1 | (* scheduler.sml |
2 | * 2004 Matthew Fluet (mfluet@acm.org) | |
3 | * Ported to MLton threads. | |
4 | *) | |
5 | ||
6 | (* scheduler.sml | |
7 | * | |
8 | * COPYRIGHT (c) 1995 AT&T Bell Laboratories. | |
9 | * COPYRIGHT (c) 1989-1991 John H. Reppy | |
10 | * | |
11 | * This module implements the scheduling queues and preemption | |
12 | * mechanisms. | |
13 | *) | |
14 | ||
15 | structure Scheduler : SCHEDULER = | |
16 | struct | |
17 | structure Assert = LocalAssert(val assert = false) | |
18 | structure GlobalDebug = Debug | |
19 | structure Debug = LocalDebug(val debug = false) | |
20 | ||
21 | open Critical | |
22 | ||
23 | structure Q = ImpQueue | |
24 | structure T = MLton.Thread | |
25 | structure TID = ThreadID | |
26 | structure SH = SchedulerHooks | |
27 | ||
28 | type thread_id = ThreadID.thread_id | |
29 | datatype thread = datatype RepTypes.thread | |
30 | datatype rdy_thread = datatype RepTypes.rdy_thread | |
31 | ||
32 | fun prep (THRD (tid, t)) = RTHRD (tid, T.prepare (t, ())) | |
33 | fun prepVal (THRD (tid, t), v) = RTHRD (tid, T.prepare (t, v)) | |
34 | fun prepFn (THRD (tid, t), f) = RTHRD (tid, T.prepare (T.prepend (t, f), ())) | |
35 | ||
36 | (* the dummy thread Id; this is used when an ID is needed to get | |
37 | * the types right | |
38 | *) | |
39 | val dummyTid = TID.bogus "dummy" | |
40 | (* the error thread. This thread is used to trap attempts to run CML | |
41 | * without proper initialization (i.e., via RunCML). This thread is | |
42 | * enqueued by reset. | |
43 | *) | |
44 | val errorTid = TID.bogus "error" | |
45 | fun errorThrd () : unit thread = | |
46 | THRD (errorTid, T.new (fn () => | |
47 | (GlobalDebug.sayDebug | |
48 | ([fn () => "CML"], fn () => "**** Use RunCML.doit to run CML ****") | |
49 | ; raise Fail "CML not initialized"))) | |
50 | ||
51 | local | |
52 | val curTid : thread_id ref = ref dummyTid | |
53 | in | |
54 | fun getThreadId (THRD (tid, _)) = tid | |
55 | fun getCurThreadId () = | |
56 | let | |
57 | val tid = !curTid | |
58 | in | |
59 | tid | |
60 | end | |
61 | fun setCurThreadId tid = | |
62 | let | |
63 | val () = Assert.assertAtomic' ("Scheduler.setCurThreadId", NONE) | |
64 | in | |
65 | curTid := tid | |
66 | end | |
67 | end | |
68 | fun tidMsg () = TID.tidToString (getCurThreadId ()) | |
69 | fun debug msg = Debug.sayDebug ([atomicMsg, tidMsg], msg) | |
70 | fun debug' msg = debug (fn () => msg) | |
71 | ||
72 | (* The thread ready queues: | |
73 | * rdyQ1 is the primary queue and rdyQ2 is the secondary queue. | |
74 | *) | |
75 | val rdyQ1 : rdy_thread Q.t = Q.new () | |
76 | and rdyQ2 : rdy_thread Q.t = Q.new () | |
77 | ||
78 | (* enqueue a thread in the primary queue *) | |
79 | fun enque1 thrd = | |
80 | (Assert.assertAtomic' ("Scheduler.enque1", NONE) | |
81 | ; Q.enque (rdyQ1, thrd)) | |
82 | (* enqueue a thread in the secondary queue *) | |
83 | fun enque2 thrd = | |
84 | (Assert.assertAtomic' ("Scheduler.enque2", NONE) | |
85 | ; Q.enque (rdyQ2, thrd)) | |
86 | (* dequeue a thread from the primary queue *) | |
87 | fun deque1 () = | |
88 | (Assert.assertAtomic' ("Scheduler.deque1", NONE) | |
89 | ; case Q.deque rdyQ1 of | |
90 | NONE => deque2 () | |
91 | | SOME thrd => SOME thrd) | |
92 | (* dequeue a thread from the secondary queue *) | |
93 | and deque2 () = | |
94 | (Assert.assertAtomic' ("Scheduler.deque2", NONE) | |
95 | ; case Q.deque rdyQ2 of | |
96 | NONE => NONE | |
97 | | SOME thrd => SOME thrd) | |
98 | (* promote a thread from the secondary queue to the primary queue *) | |
99 | fun promote () = | |
100 | (Assert.assertAtomic' ("Scheduler.promote", NONE) | |
101 | ; case deque2 () of | |
102 | NONE => () | |
103 | | SOME thrd => enque1 thrd) | |
104 | ||
105 | fun next () = | |
106 | let | |
107 | val () = Assert.assertAtomic' ("Scheduler.next", NONE) | |
108 | val thrd = | |
109 | case deque1 () of | |
110 | NONE => !SH.pauseHook () | |
111 | | SOME thrd => thrd | |
112 | in | |
113 | thrd | |
114 | end | |
115 | fun ready thrd = | |
116 | let | |
117 | val () = Assert.assertAtomic' ("Scheduler.ready", NONE) | |
118 | val () = enque1 thrd | |
119 | in | |
120 | () | |
121 | end | |
122 | local | |
123 | fun atomicSwitchAux msg f = | |
124 | (Assert.assertAtomic (fn () => "Scheduler." ^ msg, NONE) | |
125 | ; T.atomicSwitch (fn t => | |
126 | let | |
127 | val tid = getCurThreadId () | |
128 | val () = TID.mark tid | |
129 | val RTHRD (tid',t') = f (THRD (tid, t)) | |
130 | val () = setCurThreadId tid' | |
131 | in | |
132 | t' | |
133 | end)) | |
134 | in | |
135 | fun atomicSwitch (f: 'a thread -> rdy_thread) = | |
136 | atomicSwitchAux "atomicSwitch" f | |
137 | fun switch (f: 'a thread -> rdy_thread) = | |
138 | (atomicBegin (); atomicSwitch f) | |
139 | fun atomicSwitchToNext (f: 'a thread -> unit) = | |
140 | atomicSwitchAux "atomicSwitchToNext" (fn thrd => (f thrd; next ())) | |
141 | fun switchToNext (f: 'a thread -> unit) = | |
142 | (atomicBegin (); atomicSwitchToNext f) | |
143 | fun atomicReadyAndSwitch (f: unit -> rdy_thread) = | |
144 | atomicSwitchAux "atomicReadyAndSwitch" (fn thrd => (ready (prep thrd); f ())) | |
145 | fun readyAndSwitch (f: unit -> rdy_thread) = | |
146 | (atomicBegin (); atomicReadyAndSwitch f) | |
147 | fun atomicReadyAndSwitchToNext (f: unit -> unit) = | |
148 | atomicSwitchAux "atomicReadyAndSwitchToNext" (fn thrd => (ready (prep thrd); f (); next ())) | |
149 | fun readyAndSwitchToNext (f: unit -> unit) = | |
150 | (atomicBegin (); atomicReadyAndSwitchToNext f) | |
151 | end | |
152 | ||
153 | fun new (f : thread_id -> ('a -> unit)) : 'a thread = | |
154 | let | |
155 | val () = Assert.assertAtomic' ("Scheduler.new", NONE) | |
156 | val tid = TID.new () | |
157 | val t = T.new (f tid) | |
158 | in | |
159 | THRD (tid, t) | |
160 | end | |
161 | ||
162 | fun prepend (thrd : 'a thread, f : 'b -> 'a) : 'b thread = | |
163 | let | |
164 | val () = Assert.assertAtomic' ("Scheduler.prepend", NONE) | |
165 | val THRD (tid, t) = thrd | |
166 | val t = T.prepend (t, f) | |
167 | in | |
168 | THRD (tid, t) | |
169 | end | |
170 | ||
171 | fun unwrap (f : rdy_thread -> rdy_thread) (t: T.Runnable.t) : T.Runnable.t = | |
172 | let | |
173 | val () = Assert.assertAtomic' ("Scheduler.unwrap", NONE) | |
174 | val tid = getCurThreadId () | |
175 | val RTHRD (tid', t') = f (RTHRD (tid, t)) | |
176 | val () = setCurThreadId tid' | |
177 | in | |
178 | t' | |
179 | end | |
180 | ||
181 | ||
182 | (* reset various pieces of state *) | |
183 | fun reset running = | |
184 | (atomicBegin () | |
185 | ; setCurThreadId dummyTid | |
186 | ; Q.reset rdyQ1; Q.reset rdyQ2 | |
187 | ; if not running then ready (prep (errorThrd ())) else () | |
188 | ; atomicEnd ()) | |
189 | (* what to do at a preemption (with the current thread) *) | |
190 | fun preempt (thrd as RTHRD (tid, _)) = | |
191 | let | |
192 | val () = Assert.assertAtomic' ("Scheduler.preempt", NONE) | |
193 | val () = debug' "Scheduler.preempt" (* Atomic 1 *) | |
194 | val () = Assert.assertAtomic' ("Scheduler.preempt", SOME 1) | |
195 | val () = | |
196 | if TID.isMarked tid | |
197 | then (TID.unmark tid | |
198 | ; promote () | |
199 | ; enque1 thrd) | |
200 | else enque2 thrd | |
201 | in | |
202 | () | |
203 | end | |
204 | ||
205 | val _ = reset false | |
206 | end |