Only lazily compile where profitable
[bpt/guile.git] / module / ice-9 / futures.scm
CommitLineData
0d4e6ca3
LC
1;;; -*- mode: scheme; coding: utf-8; -*-
2;;;
8a177d31 3;;; Copyright (C) 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
0d4e6ca3
LC
4;;;
5;;; This library is free software; you can redistribute it and/or
6;;; modify it under the terms of the GNU Lesser General Public
7;;; License as published by the Free Software Foundation; either
8;;; version 3 of the License, or (at your option) any later version.
9;;;
10;;; This library is distributed in the hope that it will be useful,
11;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
12;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13;;; Lesser General Public License for more details.
14;;;
15;;; You should have received a copy of the GNU Lesser General Public
16;;; License along with this library; if not, write to the Free Software
17;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19(define-module (ice-9 futures)
20 #:use-module (srfi srfi-1)
21 #:use-module (srfi srfi-9)
ab975cf5 22 #:use-module (srfi srfi-9 gnu)
3e529bf0 23 #:use-module (srfi srfi-11)
90b2c69c 24 #:use-module (ice-9 q)
3e529bf0 25 #:use-module (ice-9 match)
55e26a49 26 #:use-module (ice-9 control)
0d4e6ca3
LC
27 #:export (future make-future future? touch))
28
29;;; Author: Ludovic Courtès <ludo@gnu.org>
30;;;
31;;; Commentary:
32;;;
33;;; This module provides an implementation of futures, a mechanism for
34;;; fine-grain parallelism. Futures were first described by Henry Baker
35;;; in ``The Incremental Garbage Collection of Processes'', 1977, and
36;;; then implemented in MultiLisp (an implicit variant thereof, i.e.,
37;;; without `touch'.)
38;;;
39;;; This modules uses a fixed thread pool, normally one per CPU core.
40;;; Futures are off-loaded to these threads, when they are idle.
41;;;
42;;; Code:
43
44\f
45;;;
46;;; Futures.
47;;;
48
49(define-record-type <future>
f2fb5e53 50 (%make-future thunk state mutex completion)
0d4e6ca3 51 future?
3e529bf0 52 (thunk future-thunk set-future-thunk!)
f2fb5e53
LC
53 (state future-state set-future-state!) ; done | started | queued
54 (result future-result set-future-result!)
55 (mutex future-mutex)
56 (completion future-completion)) ; completion cond. var.
0d4e6ca3 57
ab975cf5
LC
58(set-record-type-printer!
59 <future>
60 (lambda (future port)
61 (simple-format port "#<future ~a ~a ~s>"
62 (number->string (object-address future) 16)
63 (future-state future)
64 (future-thunk future))))
65
0d4e6ca3
LC
66(define (make-future thunk)
67 "Return a new future for THUNK. Execution may start at any point
68concurrently, or it can start at the time when the returned future is
69touched."
f4e45e91 70 (create-workers!)
f2fb5e53
LC
71 (let ((future (%make-future thunk 'queued
72 (make-mutex) (make-condition-variable))))
0d4e6ca3
LC
73 (register-future! future)
74 future))
75
76\f
77;;;
78;;; Future queues.
79;;;
80
3e529bf0
LC
81;; Global queue of pending futures.
82;; TODO: Use per-worker queues to reduce contention.
90b2c69c 83(define %futures (make-q))
3e529bf0
LC
84
85;; Lock for %FUTURES and %FUTURES-WAITING.
0d4e6ca3
LC
86(define %futures-mutex (make-mutex))
87(define %futures-available (make-condition-variable))
88
3e529bf0
LC
89;; A mapping of nested futures to futures waiting for them to complete.
90(define %futures-waiting '())
91
8a177d31
LC
92;; Nesting level of futures. Incremented each time a future is touched
93;; from within a future.
94(define %nesting-level (make-parameter 0))
95
96;; Maximum nesting level. The point is to avoid stack overflows when
97;; nested futures are executed on the same stack. See
98;; <http://bugs.gnu.org/13188>.
99(define %max-nesting-level 200)
3e529bf0 100
f2fb5e53
LC
101(define-syntax-rule (with-mutex m e0 e1 ...)
102 ;; Copied from (ice-9 threads) to avoid circular dependency.
103 (let ((x m))
104 (dynamic-wind
105 (lambda () (lock-mutex x))
106 (lambda () (begin e0 e1 ...))
107 (lambda () (unlock-mutex x)))))
108
3e529bf0
LC
109(define %future-prompt
110 ;; The prompt futures abort to when they want to wait for another
111 ;; future.
112 (make-prompt-tag))
113
114\f
0d4e6ca3
LC
115(define (register-future! future)
116 ;; Register FUTURE as being processable.
117 (lock-mutex %futures-mutex)
90b2c69c 118 (enq! %futures future)
0d4e6ca3
LC
119 (signal-condition-variable %futures-available)
120 (unlock-mutex %futures-mutex))
121
0d4e6ca3 122(define (process-future! future)
3e529bf0
LC
123 "Process FUTURE. When FUTURE completes, return #t and update its
124result; otherwise, when FUTURE touches a nested future that has not
125completed yet, then suspend it and return #f. Suspending a future
126consists in capturing its continuation, marking it as `queued', and
127adding it to the waiter queue."
128 (let/ec return
129 (let* ((suspend
130 (lambda (cont future-to-wait)
131 ;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT.
132 ;; At this point, FUTURE is unlocked and in `started' state,
133 ;; and FUTURE-TO-WAIT is unlocked.
134 (with-mutex %futures-mutex
135 (with-mutex (future-mutex future)
136 (set-future-thunk! future cont)
137 (set-future-state! future 'queued))
138
139 (with-mutex (future-mutex future-to-wait)
140 ;; If FUTURE-TO-WAIT completed in the meantime, then
141 ;; reschedule FUTURE directly; otherwise, add it to the
142 ;; waiter queue.
143 (if (eq? 'done (future-state future-to-wait))
144 (begin
145 (enq! %futures future)
146 (signal-condition-variable %futures-available))
147 (set! %futures-waiting
148 (alist-cons future-to-wait future
149 %futures-waiting))))
150
151 (return #f))))
152 (thunk (lambda ()
153 (call-with-prompt %future-prompt
154 (lambda ()
8a177d31
LC
155 (parameterize ((%nesting-level
156 (1+ (%nesting-level))))
3e529bf0
LC
157 ((future-thunk future))))
158 suspend))))
159 (set-future-result! future
160 (catch #t
161 (lambda ()
162 (call-with-values thunk
163 (lambda results
164 (lambda ()
165 (apply values results)))))
166 (lambda args
6c17f7bd 167 (lambda ()
3e529bf0
LC
168 (apply throw args)))))
169 #t)))
170
171(define (process-one-future)
172 "Attempt to pick one future from the queue and process it."
173 ;; %FUTURES-MUTEX must be locked on entry, and is locked on exit.
174 (or (q-empty? %futures)
175 (let ((future (deq! %futures)))
176 (lock-mutex (future-mutex future))
177 (case (future-state future)
178 ((done started)
179 ;; Nothing to do.
180 (unlock-mutex (future-mutex future)))
181 (else
182 ;; Do the actual work.
183
184 ;; We want to release %FUTURES-MUTEX so that other workers can
185 ;; progress. However, to avoid deadlocks, we have to unlock
186 ;; FUTURE as well, to preserve lock ordering.
187 (unlock-mutex (future-mutex future))
188 (unlock-mutex %futures-mutex)
189
190 (lock-mutex (future-mutex future))
191 (if (eq? (future-state future) 'queued) ; lost the race?
192 (begin ; no, so let's process it
193 (set-future-state! future 'started)
194 (unlock-mutex (future-mutex future))
195
196 (let ((done? (process-future! future)))
197 (when done?
198 (with-mutex %futures-mutex
199 (with-mutex (future-mutex future)
200 (set-future-state! future 'done)
201 (notify-completion future))))))
202 (unlock-mutex (future-mutex future))) ; yes
203
204 (lock-mutex %futures-mutex))))))
0d4e6ca3
LC
205
206(define (process-futures)
3e529bf0 207 "Continuously process futures from the queue."
0d4e6ca3
LC
208 (lock-mutex %futures-mutex)
209 (let loop ()
134c95f1
LC
210 (when (q-empty? %futures)
211 (wait-condition-variable %futures-available
212 %futures-mutex))
213
3e529bf0 214 (process-one-future)
90b2c69c 215 (loop)))
0d4e6ca3 216
3e529bf0
LC
217(define (notify-completion future)
218 "Notify futures and callers waiting that FUTURE completed."
219 ;; FUTURE and %FUTURES-MUTEX are locked.
220 (broadcast-condition-variable (future-completion future))
221 (let-values (((waiting remaining)
222 (partition (match-lambda ; TODO: optimize
223 ((waitee . _)
224 (eq? waitee future)))
225 %futures-waiting)))
226 (set! %futures-waiting remaining)
227 (for-each (match-lambda
228 ((_ . waiter)
229 (enq! %futures waiter)))
230 waiting)))
231
0d4e6ca3
LC
232(define (touch future)
233 "Return the result of FUTURE, computing it if not already done."
3e529bf0
LC
234 (define (work)
235 ;; Do some work while waiting for FUTURE to complete.
236 (lock-mutex %futures-mutex)
237 (if (q-empty? %futures)
238 (begin
239 (unlock-mutex %futures-mutex)
240 (with-mutex (future-mutex future)
241 (unless (eq? 'done (future-state future))
242 (wait-condition-variable (future-completion future)
243 (future-mutex future)))))
244 (begin
245 (process-one-future)
246 (unlock-mutex %futures-mutex))))
f2fb5e53 247
3e529bf0
LC
248 (let loop ()
249 (lock-mutex (future-mutex future))
250 (case (future-state future)
251 ((done)
252 (unlock-mutex (future-mutex future)))
253 ((started)
254 (unlock-mutex (future-mutex future))
8a177d31 255 (if (> (%nesting-level) 0)
3e529bf0
LC
256 (abort-to-prompt %future-prompt future)
257 (begin
258 (work)
259 (loop))))
8a177d31 260 (else ; queued
3e529bf0 261 (unlock-mutex (future-mutex future))
8a177d31
LC
262 (if (> (%nesting-level) %max-nesting-level)
263 (abort-to-prompt %future-prompt future)
264 (work))
3e529bf0 265 (loop))))
0d4e6ca3
LC
266 ((future-result future)))
267
268\f
269;;;
270;;; Workers.
271;;;
272
273(define %worker-count
274 (if (provided? 'threads)
51fc066a 275 (- (current-processor-count) 1)
0d4e6ca3
LC
276 0))
277
f4e45e91
AW
278;; A dock of workers that stay here forever.
279
280;; TODO
281;; 1. Allow the pool to be shrunk, as in libgomp (though that we'd
282;; need semaphores, which aren't yet in libguile!).
283;; 2. Provide a `worker-count' fluid.
284(define %workers '())
285
286(define (%create-workers!)
be05b336
MW
287 (with-mutex
288 %futures-mutex
289 ;; Setting 'create-workers!' to a no-op is an optimization, but it is
290 ;; still possible for '%create-workers!' to be called more than once
291 ;; from different threads. Therefore, to avoid creating %workers more
292 ;; than once (and thus creating too many threads), we check to make
293 ;; sure %workers is empty within the critical section.
294 (when (null? %workers)
295 (set! %workers
296 (unfold (lambda (i) (>= i %worker-count))
297 (lambda (i) (call-with-new-thread process-futures))
298 1+
299 0))
300 (set! create-workers! (lambda () #t)))))
f4e45e91
AW
301
302(define create-workers!
303 (lambda () (%create-workers!)))
0d4e6ca3
LC
304
305\f
306;;;
307;;; Syntax.
308;;;
309
0c65f52c
AW
310(define-syntax-rule (future body)
311 "Return a new future for BODY."
312 (make-future (lambda () body)))
3e529bf0
LC
313
314;;; Local Variables:
315;;; eval: (put 'with-mutex 'scheme-indent-function 1)
316;;; End: