futures: Keep futures unlocked while they are processing.
authorLudovic Courtès <ludo@gnu.org>
Wed, 7 Nov 2012 14:16:03 +0000 (15:16 +0100)
committerLudovic Courtès <ludo@gnu.org>
Wed, 21 Nov 2012 22:33:30 +0000 (23:33 +0100)
* module/ice-9/futures.scm (<future>)[completion]: New field.
  [done?]: Rename to...
  [state]: ... this.  Change `set-future-done?!' to
  `set-future-state!', and `future-done?' to `future-state'.
  (make-future): Initialize the `completion' field to 'queued.
  (with-mutex): New macro.
  (process-future!): Remove `set-future-done?!' call.
  (process-futures): Check `future-state'.  Unlock FUTURE's mutex before
  processing it.  Broadcast FUTURE's `completion' cond. var. when done.
  (touch): Likewise.

module/ice-9/futures.scm

index 7fbccf6..f574410 100644 (file)
 ;;;
 
 (define-record-type <future>
-  (%make-future thunk done? mutex)
+  (%make-future thunk state mutex completion)
   future?
-  (thunk     future-thunk)
-  (done?     future-done?  set-future-done?!)
-  (result    future-result set-future-result!)
-  (mutex     future-mutex))
+  (thunk        future-thunk)
+  (state        future-state  set-future-state!)  ; done | started | queued
+  (result       future-result set-future-result!)
+  (mutex        future-mutex)
+  (completion   future-completion))               ; completion cond. var.
 
 (define (make-future thunk)
   "Return a new future for THUNK.  Execution may start at any point
 concurrently, or it can start at the time when the returned future is
 touched."
   (create-workers!)
-  (let ((future (%make-future thunk #f (make-mutex))))
+  (let ((future (%make-future thunk 'queued
+                              (make-mutex) (make-condition-variable))))
     (register-future! future)
     future))
 
@@ -69,6 +71,14 @@ touched."
 (define %futures-mutex (make-mutex))
 (define %futures-available (make-condition-variable))
 
+(define-syntax-rule (with-mutex m e0 e1 ...)
+  ;; Copied from (ice-9 threads) to avoid circular dependency.
+  (let ((x m))
+    (dynamic-wind
+      (lambda () (lock-mutex x))
+      (lambda () (begin e0 e1 ...))
+      (lambda () (unlock-mutex x)))))
+
 (define (register-future! future)
   ;; Register FUTURE as being processable.
   (lock-mutex %futures-mutex)
@@ -77,7 +87,7 @@ touched."
   (unlock-mutex %futures-mutex))
 
 (define (process-future! future)
-  ;; Process FUTURE, assuming its mutex is already taken.
+  ;; Process FUTURE, and update its result.
   (set-future-result! future
                       (catch #t
                         (lambda ()
@@ -87,8 +97,7 @@ touched."
                                 (apply values results)))))
                         (lambda args
                           (lambda ()
-                            (apply throw args)))))
-  (set-future-done?! future #t))
+                            (apply throw args))))))
 
 (define (process-futures)
   ;; Wait for futures to be available and process them.
@@ -101,42 +110,74 @@ touched."
     (or (q-empty? %futures)
         (let ((future (deq! %futures)))
           (lock-mutex (future-mutex future))
-          (or (and (future-done? future)
-                   (unlock-mutex (future-mutex future)))
-              (begin
-                ;; Do the actual work.
-
-                ;; We want to release %FUTURES-MUTEX so that other workers
-                ;; can progress.  However, to avoid deadlocks, we have to
-                ;; unlock FUTURE as well, to preserve lock ordering.
-                (unlock-mutex (future-mutex future))
-                (unlock-mutex %futures-mutex)
-
-                (lock-mutex (future-mutex future))
-                (or (future-done? future)            ; lost the race?
-                    (process-future! future))
-                (unlock-mutex (future-mutex future))
-
-                (lock-mutex %futures-mutex)))))
+          (case (future-state future)
+            ((done started)
+             ;; Nothing to do.
+             (unlock-mutex (future-mutex future)))
+            (else
+             ;; Do the actual work.
+
+             ;; We want to release %FUTURES-MUTEX so that other workers can
+             ;; progress.  However, to avoid deadlocks, we have to unlock
+             ;; FUTURE as well, to preserve lock ordering.
+             (unlock-mutex (future-mutex future))
+             (unlock-mutex %futures-mutex)
+
+             (lock-mutex (future-mutex future))
+             (if (eq? (future-state future) 'queued) ; lost the race?
+                 (begin                              ; no, so let's process it
+                   (set-future-state! future 'started)
+                   (unlock-mutex (future-mutex future))
+
+                   (process-future! future)
+
+                   (with-mutex (future-mutex future)
+                     (set-future-state! future 'done))
+
+                   (broadcast-condition-variable (future-completion future)))
+                 (unlock-mutex (future-mutex future))) ; yes
+
+             (lock-mutex %futures-mutex)))))
+
+    ;; Look for more work.
     (loop)))
 
 (define (touch future)
   "Return the result of FUTURE, computing it if not already done."
   (lock-mutex (future-mutex future))
-  (or (future-done? future)
-      (begin
-        ;; Do the actual work.  Unlock FUTURE first to preserve lock
-        ;; ordering.
-        (unlock-mutex (future-mutex future))
-
-        (lock-mutex %futures-mutex)
-        (q-remove! %futures future)
-        (unlock-mutex %futures-mutex)
-
-        (lock-mutex (future-mutex future))
-        (or (future-done? future)            ; lost the race?
-            (process-future! future))))
-  (unlock-mutex (future-mutex future))
+  (case (future-state future)
+    ((done)
+     (unlock-mutex (future-mutex future)))
+    ((started)
+     ;; Wait for completion.
+     (wait-condition-variable (future-completion future)
+                              (future-mutex future))
+     (unlock-mutex (future-mutex future)))
+    ((queued)
+     (begin
+       ;; Do the actual work.  Unlock FUTURE first to preserve lock
+       ;; ordering.
+       (unlock-mutex (future-mutex future))
+
+       (lock-mutex %futures-mutex)
+       (q-remove! %futures future)
+       (unlock-mutex %futures-mutex)
+
+       (lock-mutex (future-mutex future))
+       (if (eq? (future-state future) 'queued) ; lost the race?
+           (begin                              ; no, so let's process it
+             (set-future-state! future 'started)
+             (unlock-mutex (future-mutex future))
+
+             (process-future! future)
+
+             (with-mutex (future-mutex future)
+               (set-future-state! future 'done))
+
+             (broadcast-condition-variable (future-completion future)))
+           (begin                              ; yes, so try again
+             (unlock-mutex (future-mutex future))
+             (touch future))))))
   ((future-result future)))
 
 \f