* threads.scm (par-map, par-for-each): Reimplemented using
authorMikael Djurfeldt <djurfeldt@nada.kth.se>
Sun, 15 Dec 2002 14:36:19 +0000 (14:36 +0000)
committerMikael Djurfeldt <djurfeldt@nada.kth.se>
Sun, 15 Dec 2002 14:36:19 +0000 (14:36 +0000)
joing-thread.
(parallel): Reimplemented using futures.
(n-par-map, n-for-each): New procedures.

ice-9/ChangeLog
ice-9/threads.scm

index 57d7d3c..ea99293 100644 (file)
@@ -1,3 +1,10 @@
+2002-12-15  Mikael Djurfeldt  <djurfeldt@nada.kth.se>
+
+       * threads.scm (par-map, par-for-each): Reimplemented using
+       joing-thread.
+       (parallel): Reimplemented using futures.
+       (n-par-map, n-for-each): New procedures.
+
 2002-12-12  Marius Vollmer  <mvo@zagadka.ping.de>
 
        * optargs.scm (improper-list-copy): New.
index f5d178a..86bdae5 100644 (file)
 ;;; Code:
 
 (define-module (ice-9 threads)
-  :export (par-map
+  :export (future-ref
+          par-map
           par-for-each
-          %thread-handler)
-  :export-syntax (make-thread
-                 begin-thread
+          n-par-map
+          n-par-for-each)
+  :export-syntax (begin-thread
+                 future
                  parallel
                  letpar
+                 make-thread
                  with-mutex
                  monitor))
 
 \f
 
-(define (par-map proc . arglists)
-  (let* ((m (make-mutex))
-        (c (make-condition-variable))
-        (n (length (car arglists)))
-        (counter (- n 1))
-        (res (make-list n))
-        (ls res))
-    (lock-mutex m)
-    (apply for-each
-          (lambda args
-            (let ((res ls))
-              (set! ls (cdr ls))
-              (call-with-new-thread
-               (lambda ()
-                 (set-car! res (apply proc args))
-                 ;; synchronize
-                 (lock-mutex m)
-                 (if (zero? counter)
-                     (signal-condition-variable c)
-                     (set! counter (- counter 1)))
-                 (unlock-mutex m))
-               %thread-handler)))
-          arglists)
-    (wait-condition-variable c m)
-    res))
-
-(define (par-for-each proc . arglists)
+(define future-ref join-thread)
+
+(define ((par-mapper mapper)  proc . arglists)
+  (mapper join-thread
+         (apply map
+                (lambda args
+                  (call-with-new-thread (lambda ()
+                                          (apply proc args))
+                                        %thread-handler))
+                arglists)))
+
+(define par-map (par-mapper map))
+(define par-for-each (par-mapper for-each))
+
+(define (n-par-map n proc . arglists)
   (let* ((m (make-mutex))
-        (c (make-condition-variable))
-        (counter (- (length (car arglists)) 1)))
-    (lock-mutex m)
-    (apply for-each
-          (lambda args
-            (call-with-new-thread
-             (lambda ()
-               (apply proc args)
-               ;; synchronize
-               (lock-mutex m)
-               (if (zero? counter)
-                   (signal-condition-variable c)
-                   (set! counter (- counter 1)))
-               (unlock-mutex m))
-             %thread-handler))
-          arglists)
-    (wait-condition-variable c m)))
-
-(define (%thread-handler tag . args)
+        (threads '())
+        (results (make-list (length (car arglists))))
+        (result results))
+    (do ((i 0 (+ 1 i)))
+       ((= i n)
+        (for-each join-thread threads)
+        results)
+      (set! threads
+           (cons (call-with-new-thread
+                  (lambda ()
+                    (let loop ()
+                      (lock-mutex m)
+                      (if (null? result)
+                          (unlock-mutex m)
+                          (let ((args (map car arglists))
+                                (my-result result))
+                            (set! arglists (map cdr arglists))
+                            (set! result (cdr result))
+                            (unlock-mutex m)
+                            (set-car! my-result (apply proc args))
+                            (loop)))))
+                  %thread-handler)
+                 threads)))))
+
+(define (n-par-for-each n proc . arglists)
+  (let ((m (make-mutex))
+       (threads '()))
+    (do ((i 0 (+ 1 i)))
+       ((= i n)
+        (for-each join-thread threads))
+      (set! threads
+           (cons (call-with-new-thread
+                  (lambda ()
+                    (let loop ()
+                      (lock-mutex m)
+                      (if (null? (car arglists))
+                          (unlock-mutex m)
+                          (let ((args (map car arglists)))
+                            (set! arglists (map cdr arglists))
+                            (unlock-mutex m)
+                            (apply proc args)
+                            (loop)))))
+                  %thread-handler)
+                 threads)))))
+
+(define (thread-handler tag . args)
   (fluid-set! the-last-stack #f)
   (let ((n (length args))
        (p (current-error-port)))
           (newline p)))
     #f))
 
-; --- MACROS -------------------------------------------------------
+;;; Set system thread handler
+(set! %thread-handler thread-handler)
 
-(defmacro make-thread (proc . args)
-  `(call-with-new-thread
-    (lambda ()
-      (,proc ,@args))
-    %thread-handler))
+; --- MACROS -------------------------------------------------------
 
-(defmacro begin-thread (first . rest)
-  `(call-with-new-thread
-    (lambda ()
-      (begin
-       ,first ,@rest))
-    %thread-handler))
+(define-macro (begin-thread . forms)
+  (if (null? forms)
+      '(begin)
+      `(call-with-new-thread
+       (lambda ()
+         ,@forms)
+       %thread-handler)))
 
-(defmacro parallel forms
+(define-macro (parallel . forms)
   (cond ((null? forms) '(begin))
        ((null? (cdr forms)) (car forms))
        (else
-        (let* ((m (make-symbol "m"))
-               (c (make-symbol "c"))
-               (counter (make-symbol "counter"))
-               (sync (make-symbol "sync"))
-               (n-forms (length forms))
-               (vars (map (lambda (i)
-                            (make-symbol (string-append "res"
-                                                        (number->string i))))
-                          (iota n-forms))))
-          `(let* ((,m (make-mutex))
-                  (,c (make-condition-variable))
-                  (,counter ,(- n-forms 1))
-                  (,sync (lambda ()
-                           (lock-mutex ,m)
-                           (if (zero? ,counter)
-                               (signal-condition-variable ,c)
-                               (set! ,counter (- ,counter 1)))
-                           (unlock-mutex ,m)))
-                  ,@(map (lambda (var)
-                           `(,var #f))
-                         vars))
-             (lock-mutex ,m)       
-             ,@(map (lambda (var form)
-                      `(call-with-new-thread (lambda ()
-                                               (set! ,var ,form)
-                                               (,sync))
-                                             %thread-handler))
-                    vars
-                    forms)
-             (wait-condition-variable ,c ,m)
-             (values ,@vars))))))
-
-(defmacro letpar (bindings . body)
+        `(apply values
+                (map future-ref
+                     (list ,@(map (lambda (form) `(future ,form)) forms)))))))
+
+(define-macro (letpar bindings . body)
   `(call-with-values
        (lambda ()
         (parallel ,@(map cadr bindings)))
      (lambda ,(map car bindings)
        ,@body)))
 
-(defmacro with-mutex (m . body)
+(define-macro (make-thread proc . args)
+  `(call-with-new-thread
+    (lambda ()
+      (,proc ,@args))
+    %thread-handler))
+
+(define-macro (with-mutex m . body)
   `(dynamic-wind
        (lambda () (lock-mutex ,m))
        (lambda () (begin ,@body))
        (lambda () (unlock-mutex ,m))))
 
-(defmacro monitor (first . rest)
+(define-macro (monitor first . rest)
   `(with-mutex ,(make-mutex)
      (begin
        ,first ,@rest)))