Commit | Line | Data |
---|---|---|
0d4e6ca3 LC |
1 | ;;; -*- mode: scheme; coding: utf-8; -*- |
2 | ;;; | |
8a177d31 | 3 | ;;; Copyright (C) 2010, 2011, 2012, 2013 Free Software Foundation, Inc. |
0d4e6ca3 LC |
4 | ;;; |
5 | ;;; This library is free software; you can redistribute it and/or | |
6 | ;;; modify it under the terms of the GNU Lesser General Public | |
7 | ;;; License as published by the Free Software Foundation; either | |
8 | ;;; version 3 of the License, or (at your option) any later version. | |
9 | ;;; | |
10 | ;;; This library is distributed in the hope that it will be useful, | |
11 | ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 | ;;; Lesser General Public License for more details. | |
14 | ;;; | |
15 | ;;; You should have received a copy of the GNU Lesser General Public | |
16 | ;;; License along with this library; if not, write to the Free Software | |
17 | ;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
18 | ||
19 | (define-module (ice-9 futures) | |
20 | #:use-module (srfi srfi-1) | |
21 | #:use-module (srfi srfi-9) | |
ab975cf5 | 22 | #:use-module (srfi srfi-9 gnu) |
3e529bf0 | 23 | #:use-module (srfi srfi-11) |
90b2c69c | 24 | #:use-module (ice-9 q) |
3e529bf0 | 25 | #:use-module (ice-9 match) |
55e26a49 | 26 | #:use-module (ice-9 control) |
0d4e6ca3 LC |
27 | #:export (future make-future future? touch)) |
28 | ||
29 | ;;; Author: Ludovic Courtès <ludo@gnu.org> | |
30 | ;;; | |
31 | ;;; Commentary: | |
32 | ;;; | |
33 | ;;; This module provides an implementation of futures, a mechanism for | |
34 | ;;; fine-grain parallelism. Futures were first described by Henry Baker | |
35 | ;;; in ``The Incremental Garbage Collection of Processes'', 1977, and | |
36 | ;;; then implemented in MultiLisp (an implicit variant thereof, i.e., | |
37 | ;;; without `touch'.) | |
38 | ;;; | |
39 | ;;; This modules uses a fixed thread pool, normally one per CPU core. | |
40 | ;;; Futures are off-loaded to these threads, when they are idle. | |
41 | ;;; | |
42 | ;;; Code: | |
43 | ||
44 | \f | |
45 | ;;; | |
46 | ;;; Futures. | |
47 | ;;; | |
48 | ||
49 | (define-record-type <future> | |
f2fb5e53 | 50 | (%make-future thunk state mutex completion) |
0d4e6ca3 | 51 | future? |
3e529bf0 | 52 | (thunk future-thunk set-future-thunk!) |
f2fb5e53 LC |
53 | (state future-state set-future-state!) ; done | started | queued |
54 | (result future-result set-future-result!) | |
55 | (mutex future-mutex) | |
56 | (completion future-completion)) ; completion cond. var. | |
0d4e6ca3 | 57 | |
ab975cf5 LC |
58 | (set-record-type-printer! |
59 | <future> | |
60 | (lambda (future port) | |
61 | (simple-format port "#<future ~a ~a ~s>" | |
62 | (number->string (object-address future) 16) | |
63 | (future-state future) | |
64 | (future-thunk future)))) | |
65 | ||
0d4e6ca3 LC |
66 | (define (make-future thunk) |
67 | "Return a new future for THUNK. Execution may start at any point | |
68 | concurrently, or it can start at the time when the returned future is | |
69 | touched." | |
f4e45e91 | 70 | (create-workers!) |
f2fb5e53 LC |
71 | (let ((future (%make-future thunk 'queued |
72 | (make-mutex) (make-condition-variable)))) | |
0d4e6ca3 LC |
73 | (register-future! future) |
74 | future)) | |
75 | ||
76 | \f | |
77 | ;;; | |
78 | ;;; Future queues. | |
79 | ;;; | |
80 | ||
3e529bf0 LC |
81 | ;; Global queue of pending futures. |
82 | ;; TODO: Use per-worker queues to reduce contention. | |
90b2c69c | 83 | (define %futures (make-q)) |
3e529bf0 LC |
84 | |
85 | ;; Lock for %FUTURES and %FUTURES-WAITING. | |
0d4e6ca3 LC |
86 | (define %futures-mutex (make-mutex)) |
87 | (define %futures-available (make-condition-variable)) | |
88 | ||
3e529bf0 LC |
89 | ;; A mapping of nested futures to futures waiting for them to complete. |
90 | (define %futures-waiting '()) | |
91 | ||
8a177d31 LC |
92 | ;; Nesting level of futures. Incremented each time a future is touched |
93 | ;; from within a future. | |
94 | (define %nesting-level (make-parameter 0)) | |
95 | ||
96 | ;; Maximum nesting level. The point is to avoid stack overflows when | |
97 | ;; nested futures are executed on the same stack. See | |
98 | ;; <http://bugs.gnu.org/13188>. | |
99 | (define %max-nesting-level 200) | |
3e529bf0 | 100 | |
f2fb5e53 LC |
101 | (define-syntax-rule (with-mutex m e0 e1 ...) |
102 | ;; Copied from (ice-9 threads) to avoid circular dependency. | |
103 | (let ((x m)) | |
104 | (dynamic-wind | |
105 | (lambda () (lock-mutex x)) | |
106 | (lambda () (begin e0 e1 ...)) | |
107 | (lambda () (unlock-mutex x))))) | |
108 | ||
3e529bf0 LC |
109 | (define %future-prompt |
110 | ;; The prompt futures abort to when they want to wait for another | |
111 | ;; future. | |
112 | (make-prompt-tag)) | |
113 | ||
114 | \f | |
0d4e6ca3 LC |
115 | (define (register-future! future) |
116 | ;; Register FUTURE as being processable. | |
117 | (lock-mutex %futures-mutex) | |
90b2c69c | 118 | (enq! %futures future) |
0d4e6ca3 LC |
119 | (signal-condition-variable %futures-available) |
120 | (unlock-mutex %futures-mutex)) | |
121 | ||
0d4e6ca3 | 122 | (define (process-future! future) |
3e529bf0 LC |
123 | "Process FUTURE. When FUTURE completes, return #t and update its |
124 | result; otherwise, when FUTURE touches a nested future that has not | |
125 | completed yet, then suspend it and return #f. Suspending a future | |
126 | consists in capturing its continuation, marking it as `queued', and | |
127 | adding it to the waiter queue." | |
128 | (let/ec return | |
129 | (let* ((suspend | |
130 | (lambda (cont future-to-wait) | |
131 | ;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT. | |
132 | ;; At this point, FUTURE is unlocked and in `started' state, | |
133 | ;; and FUTURE-TO-WAIT is unlocked. | |
134 | (with-mutex %futures-mutex | |
135 | (with-mutex (future-mutex future) | |
136 | (set-future-thunk! future cont) | |
137 | (set-future-state! future 'queued)) | |
138 | ||
139 | (with-mutex (future-mutex future-to-wait) | |
140 | ;; If FUTURE-TO-WAIT completed in the meantime, then | |
141 | ;; reschedule FUTURE directly; otherwise, add it to the | |
142 | ;; waiter queue. | |
143 | (if (eq? 'done (future-state future-to-wait)) | |
144 | (begin | |
145 | (enq! %futures future) | |
146 | (signal-condition-variable %futures-available)) | |
147 | (set! %futures-waiting | |
148 | (alist-cons future-to-wait future | |
149 | %futures-waiting)))) | |
150 | ||
151 | (return #f)))) | |
152 | (thunk (lambda () | |
153 | (call-with-prompt %future-prompt | |
154 | (lambda () | |
8a177d31 LC |
155 | (parameterize ((%nesting-level |
156 | (1+ (%nesting-level)))) | |
3e529bf0 LC |
157 | ((future-thunk future)))) |
158 | suspend)))) | |
159 | (set-future-result! future | |
160 | (catch #t | |
161 | (lambda () | |
162 | (call-with-values thunk | |
163 | (lambda results | |
164 | (lambda () | |
165 | (apply values results))))) | |
166 | (lambda args | |
6c17f7bd | 167 | (lambda () |
3e529bf0 LC |
168 | (apply throw args))))) |
169 | #t))) | |
170 | ||
171 | (define (process-one-future) | |
172 | "Attempt to pick one future from the queue and process it." | |
173 | ;; %FUTURES-MUTEX must be locked on entry, and is locked on exit. | |
174 | (or (q-empty? %futures) | |
175 | (let ((future (deq! %futures))) | |
176 | (lock-mutex (future-mutex future)) | |
177 | (case (future-state future) | |
178 | ((done started) | |
179 | ;; Nothing to do. | |
180 | (unlock-mutex (future-mutex future))) | |
181 | (else | |
182 | ;; Do the actual work. | |
183 | ||
184 | ;; We want to release %FUTURES-MUTEX so that other workers can | |
185 | ;; progress. However, to avoid deadlocks, we have to unlock | |
186 | ;; FUTURE as well, to preserve lock ordering. | |
187 | (unlock-mutex (future-mutex future)) | |
188 | (unlock-mutex %futures-mutex) | |
189 | ||
190 | (lock-mutex (future-mutex future)) | |
191 | (if (eq? (future-state future) 'queued) ; lost the race? | |
192 | (begin ; no, so let's process it | |
193 | (set-future-state! future 'started) | |
194 | (unlock-mutex (future-mutex future)) | |
195 | ||
196 | (let ((done? (process-future! future))) | |
197 | (when done? | |
198 | (with-mutex %futures-mutex | |
199 | (with-mutex (future-mutex future) | |
200 | (set-future-state! future 'done) | |
201 | (notify-completion future)))))) | |
202 | (unlock-mutex (future-mutex future))) ; yes | |
203 | ||
204 | (lock-mutex %futures-mutex)))))) | |
0d4e6ca3 LC |
205 | |
206 | (define (process-futures) | |
3e529bf0 | 207 | "Continuously process futures from the queue." |
0d4e6ca3 LC |
208 | (lock-mutex %futures-mutex) |
209 | (let loop () | |
134c95f1 LC |
210 | (when (q-empty? %futures) |
211 | (wait-condition-variable %futures-available | |
212 | %futures-mutex)) | |
213 | ||
3e529bf0 | 214 | (process-one-future) |
90b2c69c | 215 | (loop))) |
0d4e6ca3 | 216 | |
3e529bf0 LC |
217 | (define (notify-completion future) |
218 | "Notify futures and callers waiting that FUTURE completed." | |
219 | ;; FUTURE and %FUTURES-MUTEX are locked. | |
220 | (broadcast-condition-variable (future-completion future)) | |
221 | (let-values (((waiting remaining) | |
222 | (partition (match-lambda ; TODO: optimize | |
223 | ((waitee . _) | |
224 | (eq? waitee future))) | |
225 | %futures-waiting))) | |
226 | (set! %futures-waiting remaining) | |
227 | (for-each (match-lambda | |
228 | ((_ . waiter) | |
229 | (enq! %futures waiter))) | |
230 | waiting))) | |
231 | ||
0d4e6ca3 LC |
232 | (define (touch future) |
233 | "Return the result of FUTURE, computing it if not already done." | |
3e529bf0 LC |
234 | (define (work) |
235 | ;; Do some work while waiting for FUTURE to complete. | |
236 | (lock-mutex %futures-mutex) | |
237 | (if (q-empty? %futures) | |
238 | (begin | |
239 | (unlock-mutex %futures-mutex) | |
240 | (with-mutex (future-mutex future) | |
241 | (unless (eq? 'done (future-state future)) | |
242 | (wait-condition-variable (future-completion future) | |
243 | (future-mutex future))))) | |
244 | (begin | |
245 | (process-one-future) | |
246 | (unlock-mutex %futures-mutex)))) | |
f2fb5e53 | 247 | |
3e529bf0 LC |
248 | (let loop () |
249 | (lock-mutex (future-mutex future)) | |
250 | (case (future-state future) | |
251 | ((done) | |
252 | (unlock-mutex (future-mutex future))) | |
253 | ((started) | |
254 | (unlock-mutex (future-mutex future)) | |
8a177d31 | 255 | (if (> (%nesting-level) 0) |
3e529bf0 LC |
256 | (abort-to-prompt %future-prompt future) |
257 | (begin | |
258 | (work) | |
259 | (loop)))) | |
8a177d31 | 260 | (else ; queued |
3e529bf0 | 261 | (unlock-mutex (future-mutex future)) |
8a177d31 LC |
262 | (if (> (%nesting-level) %max-nesting-level) |
263 | (abort-to-prompt %future-prompt future) | |
264 | (work)) | |
3e529bf0 | 265 | (loop)))) |
0d4e6ca3 LC |
266 | ((future-result future))) |
267 | ||
268 | \f | |
269 | ;;; | |
270 | ;;; Workers. | |
271 | ;;; | |
272 | ||
273 | (define %worker-count | |
274 | (if (provided? 'threads) | |
51fc066a | 275 | (- (current-processor-count) 1) |
0d4e6ca3 LC |
276 | 0)) |
277 | ||
f4e45e91 AW |
278 | ;; A dock of workers that stay here forever. |
279 | ||
280 | ;; TODO | |
281 | ;; 1. Allow the pool to be shrunk, as in libgomp (though that we'd | |
282 | ;; need semaphores, which aren't yet in libguile!). | |
283 | ;; 2. Provide a `worker-count' fluid. | |
284 | (define %workers '()) | |
285 | ||
286 | (define (%create-workers!) | |
be05b336 MW |
287 | (with-mutex |
288 | %futures-mutex | |
289 | ;; Setting 'create-workers!' to a no-op is an optimization, but it is | |
290 | ;; still possible for '%create-workers!' to be called more than once | |
291 | ;; from different threads. Therefore, to avoid creating %workers more | |
292 | ;; than once (and thus creating too many threads), we check to make | |
293 | ;; sure %workers is empty within the critical section. | |
294 | (when (null? %workers) | |
295 | (set! %workers | |
296 | (unfold (lambda (i) (>= i %worker-count)) | |
297 | (lambda (i) (call-with-new-thread process-futures)) | |
298 | 1+ | |
299 | 0)) | |
300 | (set! create-workers! (lambda () #t))))) | |
f4e45e91 AW |
301 | |
302 | (define create-workers! | |
303 | (lambda () (%create-workers!))) | |
0d4e6ca3 LC |
304 | |
305 | \f | |
306 | ;;; | |
307 | ;;; Syntax. | |
308 | ;;; | |
309 | ||
0c65f52c AW |
310 | (define-syntax-rule (future body) |
311 | "Return a new future for BODY." | |
312 | (make-future (lambda () body))) | |
3e529bf0 LC |
313 | |
314 | ;;; Local Variables: | |
315 | ;;; eval: (put 'with-mutex 'scheme-indent-function 1) | |
316 | ;;; End: |