;;;
(define-record-type <future>
- (%make-future thunk done? mutex)
+ (%make-future thunk state mutex completion)
future?
- (thunk future-thunk)
- (done? future-done? set-future-done?!)
- (result future-result set-future-result!)
- (mutex future-mutex))
+ (thunk future-thunk)
+ (state future-state set-future-state!) ; done | started | queued
+ (result future-result set-future-result!)
+ (mutex future-mutex)
+ (completion future-completion)) ; completion cond. var.
(define (make-future thunk)
"Return a new future for THUNK. Execution may start at any point
concurrently, or it can start at the time when the returned future is
touched."
(create-workers!)
- (let ((future (%make-future thunk #f (make-mutex))))
+ (let ((future (%make-future thunk 'queued
+ (make-mutex) (make-condition-variable))))
(register-future! future)
future))
(define %futures-mutex (make-mutex))
(define %futures-available (make-condition-variable))
+(define-syntax-rule (with-mutex m e0 e1 ...)
+ ;; Copied from (ice-9 threads) to avoid circular dependency.
+ (let ((x m))
+ (dynamic-wind
+ (lambda () (lock-mutex x))
+ (lambda () (begin e0 e1 ...))
+ (lambda () (unlock-mutex x)))))
+
(define (register-future! future)
;; Register FUTURE as being processable.
(lock-mutex %futures-mutex)
(unlock-mutex %futures-mutex))
(define (process-future! future)
- ;; Process FUTURE, assuming its mutex is already taken.
+ ;; Process FUTURE, and update its result.
(set-future-result! future
(catch #t
(lambda ()
(apply values results)))))
(lambda args
(lambda ()
- (apply throw args)))))
- (set-future-done?! future #t))
+ (apply throw args))))))
(define (process-futures)
;; Wait for futures to be available and process them.
(or (q-empty? %futures)
(let ((future (deq! %futures)))
(lock-mutex (future-mutex future))
- (or (and (future-done? future)
- (unlock-mutex (future-mutex future)))
- (begin
- ;; Do the actual work.
-
- ;; We want to release %FUTURES-MUTEX so that other workers
- ;; can progress. However, to avoid deadlocks, we have to
- ;; unlock FUTURE as well, to preserve lock ordering.
- (unlock-mutex (future-mutex future))
- (unlock-mutex %futures-mutex)
-
- (lock-mutex (future-mutex future))
- (or (future-done? future) ; lost the race?
- (process-future! future))
- (unlock-mutex (future-mutex future))
-
- (lock-mutex %futures-mutex)))))
+ (case (future-state future)
+ ((done started)
+ ;; Nothing to do.
+ (unlock-mutex (future-mutex future)))
+ (else
+ ;; Do the actual work.
+
+ ;; We want to release %FUTURES-MUTEX so that other workers can
+ ;; progress. However, to avoid deadlocks, we have to unlock
+ ;; FUTURE as well, to preserve lock ordering.
+ (unlock-mutex (future-mutex future))
+ (unlock-mutex %futures-mutex)
+
+ (lock-mutex (future-mutex future))
+ (if (eq? (future-state future) 'queued) ; lost the race?
+ (begin ; no, so let's process it
+ (set-future-state! future 'started)
+ (unlock-mutex (future-mutex future))
+
+ (process-future! future)
+
+ (with-mutex (future-mutex future)
+ (set-future-state! future 'done))
+
+ (broadcast-condition-variable (future-completion future)))
+ (unlock-mutex (future-mutex future))) ; yes
+
+ (lock-mutex %futures-mutex)))))
+
+ ;; Look for more work.
(loop)))
(define (touch future)
"Return the result of FUTURE, computing it if not already done."
(lock-mutex (future-mutex future))
- (or (future-done? future)
- (begin
- ;; Do the actual work. Unlock FUTURE first to preserve lock
- ;; ordering.
- (unlock-mutex (future-mutex future))
-
- (lock-mutex %futures-mutex)
- (q-remove! %futures future)
- (unlock-mutex %futures-mutex)
-
- (lock-mutex (future-mutex future))
- (or (future-done? future) ; lost the race?
- (process-future! future))))
- (unlock-mutex (future-mutex future))
+ (case (future-state future)
+ ((done)
+ (unlock-mutex (future-mutex future)))
+ ((started)
+ ;; Wait for completion.
+ (wait-condition-variable (future-completion future)
+ (future-mutex future))
+ (unlock-mutex (future-mutex future)))
+ ((queued)
+ (begin
+ ;; Do the actual work. Unlock FUTURE first to preserve lock
+ ;; ordering.
+ (unlock-mutex (future-mutex future))
+
+ (lock-mutex %futures-mutex)
+ (q-remove! %futures future)
+ (unlock-mutex %futures-mutex)
+
+ (lock-mutex (future-mutex future))
+ (if (eq? (future-state future) 'queued) ; lost the race?
+ (begin ; no, so let's process it
+ (set-future-state! future 'started)
+ (unlock-mutex (future-mutex future))
+
+ (process-future! future)
+
+ (with-mutex (future-mutex future)
+ (set-future-state! future 'done))
+
+ (broadcast-condition-variable (future-completion future)))
+ (begin ; yes, so try again
+ (unlock-mutex (future-mutex future))
+ (touch future))))))
((future-result future)))
\f