Core enhancements, by Julian Graham, to Guile's thread, mutex and
authorNeil Jerram <neil@ossau.uklinux.net>
Sat, 8 Mar 2008 16:22:40 +0000 (16:22 +0000)
committerNeil Jerram <neil@ossau.uklinux.net>
Sat, 8 Mar 2008 16:22:40 +0000 (16:22 +0000)
condvar primitives, in preparation for SRFI-18 support.

doc/ref/ChangeLog
doc/ref/api-scheduling.texi
libguile/ChangeLog
libguile/threads.c
libguile/threads.h
test-suite/tests/threads.test

index 54173f2..aea2b0f 100644 (file)
@@ -1,3 +1,12 @@
+2008-03-08  Julian Graham  <joolean@gmail.com>
+
+       * api-scheduling.texi (Threads): Add documentation for new 
+       functions "scm_thread_p" and new "scm_join_thread_timed".
+       (Mutexes and Condition Variables): Add documentation for new 
+       functions "scm_make_mutex_with_flags", "scm_mutex_p", 
+       "scm_lock_mutex_timed", "scm_unlock_mutex_timed", and 
+       "scm_condition_variable_p".
+
 2008-02-11  Neil Jerram  <neil@ossau.uklinux.net>
 
        * api-data.texi (Random): New text about the default random state,
index 56fa202..7f40506 100644 (file)
@@ -267,12 +267,24 @@ Once @var{body} or @var{handler} returns, the return value is made the
 @emph{exit value} of the thread and the thread is terminated.
 @end deftypefn
 
+@deffn {Scheme Procedure} thread? obj
+@deffnx {C Function} scm_thread_p (obj)
+Return @code{#t} iff @var{obj} is a thread; otherwise, return
+@code{#f}.
+@end deffn
+
 @c begin (texi-doc-string "guile" "join-thread")
-@deffn {Scheme Procedure} join-thread thread
+@deffn {Scheme Procedure} join-thread thread [timeout [timeoutval]]
 @deffnx {C Function} scm_join_thread (thread)
+@deffnx {C Function} scm_join_thread_timed (thread, timeout, timeoutval)
 Wait for @var{thread} to terminate and return its exit value.  Threads
 that have not been created with @code{call-with-new-thread} or
-@code{scm_spawn_thread} have an exit value of @code{#f}.
+@code{scm_spawn_thread} have an exit value of @code{#f}.  When 
+@var{timeout} is given, it specifies a point in time where the waiting
+should be aborted.  It can be either an integer as returned by 
+@code{current-time} or a pair as returned by @code{gettimeofday}.  
+When the waiting is aborted, @var{timeoutval} is returned (if it is 
+specified; @code{#f} is returned otherwise).
 @end deffn
 
 @deffn {Scheme Procedure} thread-exited? thread
@@ -363,21 +375,51 @@ Acquiring requisite mutexes in a fixed order (like always A before B)
 in all threads is one way to avoid such problems.
 
 @sp 1
-@deffn {Scheme Procedure} make-mutex
+@deffn {Scheme Procedure} make-mutex . flags
 @deffnx {C Function} scm_make_mutex ()
-Return a new standard mutex.  It is initially unlocked.
+@deffnx {C Function} scm_make_mutex_with_flags (SCM flag)
+Return a new mutex.  It is initially unlocked.  If @var{flags} is 
+specified, it must be a list of symbols specifying configuration flags
+for the newly-created mutex.  The supported flags are: 
+@table @code
+@item unchecked-unlock
+Unless this flag is present, a call to `unlock-mutex' on the returned
+mutex when it is already unlocked will cause an error to be signalled.
+
+@item allow-external-unlock
+Allow the returned mutex to be unlocked by the calling thread even if
+it was originally locked by a different thread.
+
+@item recursive
+The returned mutex will be recursive.
+
+@end table
+@end deffn
+
+@deffn {Scheme Procedure} mutex? obj
+@deffnx {C Function} scm_mutex_p (obj)
+Return @code{#t} iff @var{obj} is a mutex; otherwise, return 
+@code{#f}.
 @end deffn
 
 @deffn {Scheme Procedure} make-recursive-mutex
 @deffnx {C Function} scm_make_recursive_mutex ()
-Create a new recursive mutex.  It is initialloy unlocked.
+Create a new recursive mutex.  It is initially unlocked.  Calling this
+function is equivalent to calling `make-mutex' and specifying the
+@code{recursive} flag.
 @end deffn
 
-@deffn {Scheme Procedure} lock-mutex mutex
+@deffn {Scheme Procedure} lock-mutex mutex [timeout]
 @deffnx {C Function} scm_lock_mutex (mutex)
+@deffnx {C Function} scm_lock_mutex_timed (mutex, timeout)
 Lock @var{mutex}.  If the mutex is already locked by another thread
 then block and return only when @var{mutex} has been acquired.
 
+When @var{timeout} is given, it specifies a point in time where the 
+waiting should be aborted.  It can be either an integer as returned 
+by @code{current-time} or a pair as returned by @code{gettimeofday}.  
+When the waiting is aborted, @code{#f} is returned. 
+
 For standard mutexes (@code{make-mutex}), and error is signalled if
 the thread has itself already locked @var{mutex}.
 
@@ -386,6 +428,10 @@ itself already locked @var{mutex}, then a further @code{lock-mutex}
 call increments the lock count.  An additional @code{unlock-mutex}
 will be required to finally release.
 
+If @var{mutex} was locked by a thread that exited before unlocking it,
+the next attempt to lock @var{mutex} will succeed, but 
+@code{abandoned-mutex-error} will be signalled.
+
 When a system async (@pxref{System asyncs}) is activated for a thread
 blocked in @code{lock-mutex}, the wait is interrupted and the async is
 executed.  When the async returns, the wait resumes.
@@ -395,7 +441,7 @@ executed.  When the async returns, the wait resumes.
 Arrange for @var{mutex} to be locked whenever the current dynwind
 context is entered and to be unlocked when it is exited.
 @end deftypefn
-
 @deffn {Scheme Procedure} try-mutex mx
 @deffnx {C Function} scm_try_mutex (mx)
 Try to lock @var{mutex} as per @code{lock-mutex}.  If @var{mutex} can
@@ -404,10 +450,25 @@ If @var{mutex} is locked by some other thread then nothing is done and
 the return is @code{#f}.
 @end deffn
 
-@deffn {Scheme Procedure} unlock-mutex mutex
+@deffn {Scheme Procedure} unlock-mutex mutex [condvar [timeout]]
 @deffnx {C Function} scm_unlock_mutex (mutex)
-Unlock @var{mutex}.  An error is signalled if @var{mutex} is not
-locked by the calling thread.
+@deffnx {C Function} scm_unlock_mutex_timed (mutex, condvar, timeout)
+Unlock @var{mutex}.  An error is signalled if @var{mutex} is not locked
+and was not created with the @code{unchecked-unlock} flag set, or if 
+@var{mutex} is locked by a thread other than the calling thread and was
+not created with the @code{allow-external-unlock} flag set.
+
+If @var{condvar} is given, it specifies a condition variable upon
+which the calling thread will wait to be signalled before returning.
+(This behavior is very similar to that of 
+@code{wait-condition-variable}, except that the mutex is left in an
+unlocked state when the function returns.)
+
+When @var{timeout} is also given, it specifies a point in time where 
+the waiting should be aborted.  It can be either an integer as 
+returned by @code{current-time} or a pair as returned by 
+@code{gettimeofday}.  When the waiting is aborted, @code{#f} is 
+returned.  Otherwise the function returns @code{#t}.
 @end deffn
 
 @deffn {Scheme Procedure} make-condition-variable
@@ -415,6 +476,12 @@ locked by the calling thread.
 Return a new condition variable.
 @end deffn
 
+@deffn {Scheme Procedure} condition-variable? obj
+@deffnx {C Function} scm_condition_variable_p (obj)
+Return @code{#t} iff @var{obj} is a condition variable; otherwise, 
+return @code{#f}.
+@end deffn
+
 @deffn {Scheme Procedure} wait-condition-variable condvar mutex [time]
 @deffnx {C Function} scm_wait_condition_variable (condvar, mutex, time)
 Wait until @var{condvar} has been signalled.  While waiting,
index 7d8846c..677d927 100644 (file)
@@ -1,3 +1,29 @@
+2008-03-08  Julian Graham  <joolean@gmail.com>
+
+       * threads.c (scm_join_thread_timed, scm_thread_p, 
+       scm_make_mutex_with_flags, scm_lock_mutex_timed, 
+       scm_unlock_mutex_timed, scm_mutex_p, scm_condition_variable_p): New 
+       functions.
+       (thread_mark): Updated to mark new struct field `mutexes'.
+       (do_thread_exit): Notify threads waiting on mutexes locked by exiting 
+       thread.
+       (scm_join_thread, scm_make_mutex, scm_make_recursive_mutex, 
+       scm_mutex_lock): Reimplement in terms of their newer 
+       counterparts.
+       (scm_abandoned_mutex_error_key): New symbol.
+       (fat_mutex)[unchecked_unlock, allow_external_unlock]: New fields.
+       (fat_mutex_lock): Reimplement to support timeouts and abandonment.
+       (fat_mutex_trylock, scm_try_mutex): Remove fat_mutex_trylock and
+       reimplement scm_try_mutex as a lock attempt with a timeout of zero.
+       (fat_mutex_unlock): Allow unlocking from other threads and unchecked
+       unlocking; implement in terms of condition variable wait.
+       (scm_timed_wait_condition_variable): Reimplement in terms of 
+       fat_mutex_unlock.
+       * threads.h (scm_i_thread)[mutexes]: New field.
+       (scm_join_thread_timed, scm_thread_p, scm_lock_mutex_timed,
+       scm_unlock_mutex_timed, scm_mutex_p, scm_condition_variable_p): 
+       Prototypes for new functions.
+
 2008-03-06  Ludovic Courtès  <ludo@gnu.org>
 
        * eval.c (scm_eval): If MODULE_OR_STATE is not a dynamic state,
index ba0aa1a..e959cc6 100644 (file)
@@ -49,6 +49,7 @@
 #include "libguile/gc.h"
 #include "libguile/init.h"
 #include "libguile/scmsigs.h"
+#include "libguile/strings.h"
 
 #ifdef __MINGW32__
 #ifndef ETIMEDOUT
 # define pipe(fd) _pipe (fd, 256, O_BINARY)
 #endif /* __MINGW32__ */
 
+static void
+to_timespec (SCM t, scm_t_timespec *waittime)
+{
+  if (scm_is_pair (t))
+    {
+      waittime->tv_sec = scm_to_ulong (SCM_CAR (t));
+      waittime->tv_nsec = scm_to_ulong (SCM_CDR (t)) * 1000;
+    }
+  else
+    {
+      double time = scm_to_double (t);
+      double sec = scm_c_truncate (time);
+
+      waittime->tv_sec = (long) sec;
+      waittime->tv_nsec = (long) ((time - sec) * 1000000);
+    }
+}
+
 /*** Queues */
 
 /* Make an empty queue data structure.
@@ -134,6 +153,7 @@ thread_mark (SCM obj)
   scm_gc_mark (t->result);
   scm_gc_mark (t->cleanup_handler);
   scm_gc_mark (t->join_queue);
+  scm_gc_mark (t->mutexes);
   scm_gc_mark (t->dynwinds);
   scm_gc_mark (t->active_asyncs);
   scm_gc_mark (t->continuation_root);
@@ -418,6 +438,7 @@ guilify_self_1 (SCM_STACKITEM *base)
   t->handle = SCM_BOOL_F;
   t->result = SCM_BOOL_F;
   t->cleanup_handler = SCM_BOOL_F;
+  t->mutexes = SCM_EOL;
   t->join_queue = SCM_EOL;
   t->dynamic_state = SCM_BOOL_F;
   t->dynwinds = SCM_EOL;
@@ -478,6 +499,31 @@ guilify_self_2 (SCM parent)
   t->block_asyncs = 0;
 }
 
+\f
+/*** Fat mutexes */
+
+/* We implement our own mutex type since we want them to be 'fair', we
+   want to do fancy things while waiting for them (like running
+   asyncs) and we might want to add things that are nice for
+   debugging.
+*/
+
+typedef struct {
+  scm_i_pthread_mutex_t lock;
+  SCM owner;
+  int level;      /* how much the owner owns us.  
+                    < 0 for non-recursive mutexes */
+
+  int unchecked_unlock; /* is it an error to unlock an unlocked mutex? */
+  int allow_external_unlock; /* is it an error to unlock a mutex that is not
+                               owned by the current thread? */
+
+  SCM waiting;    /* the threads waiting for this mutex. */
+} fat_mutex;
+
+#define SCM_MUTEXP(x)         SCM_SMOB_PREDICATE (scm_tc16_mutex, x)
+#define SCM_MUTEX_DATA(x)     ((fat_mutex *) SCM_SMOB_DATA (x))
+
 /* Perform thread tear-down, in guile mode.
  */
 static void *
@@ -503,6 +549,18 @@ do_thread_exit (void *v)
   while (scm_is_true (unblock_from_queue (t->join_queue)))
     ;
 
+  while (!scm_is_null (t->mutexes)) 
+    {
+      SCM mutex = SCM_CAR (t->mutexes);
+      fat_mutex *m  = SCM_MUTEX_DATA (mutex);
+      scm_i_pthread_mutex_lock (&m->lock);
+      
+      unblock_from_queue (m->waiting);
+
+      scm_i_pthread_mutex_unlock (&m->lock);      
+      t->mutexes = SCM_CDR (t->mutexes);
+    }
+
   scm_i_pthread_mutex_unlock (&t->admin_mutex);
 
   return NULL;
@@ -989,14 +1047,23 @@ SCM_DEFINE (scm_thread_cleanup, "thread-cleanup", 1, 0, 0,
 }
 #undef FUNC_NAME
 
-SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
-           (SCM thread),
+SCM scm_join_thread (SCM thread)
+{
+  return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED);
+}
+
+SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0,
+           (SCM thread, SCM timeout, SCM timeoutval),
 "Suspend execution of the calling thread until the target @var{thread} "
 "terminates, unless the target @var{thread} has already terminated. ")
-#define FUNC_NAME s_scm_join_thread
+#define FUNC_NAME s_scm_join_thread_timed
 {
   scm_i_thread *t;
-  SCM res;
+  scm_t_timespec ctimeout, *timeout_ptr = NULL;
+  SCM res = SCM_BOOL_F;
+
+  if (! (SCM_UNBNDP (timeoutval)))
+    res = timeoutval;
 
   SCM_VALIDATE_THREAD (1, thread);
   if (scm_is_eq (scm_current_thread (), thread))
@@ -1005,19 +1072,36 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
   t = SCM_I_THREAD_DATA (thread);
   scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
 
-  if (!t->exited)
+  if (! SCM_UNBNDP (timeout))
+    {
+      to_timespec (timeout, &ctimeout);
+      timeout_ptr = &ctimeout;
+    }
+
+  if (t->exited)
+    res = t->result;
+  else
     {
       while (1)
        {
-         block_self (t->join_queue, thread, &t->admin_mutex, NULL);
-         if (t->exited)
+         int err = block_self (t->join_queue, thread, &t->admin_mutex, 
+                               timeout_ptr);
+         if (err == 0)
+           {
+             if (t->exited)
+               {
+                 res = t->result;
+                 break;
+               }
+           }
+         else if (err == ETIMEDOUT)
            break;
+
          scm_i_pthread_mutex_unlock (&t->admin_mutex);
          SCM_TICK;
          scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
        }
     }
-  res = t->result;
 
   scm_i_pthread_mutex_unlock (&t->admin_mutex);
 
@@ -1025,26 +1109,14 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
 }
 #undef FUNC_NAME
 
-
-\f
-/*** Fat mutexes */
-
-/* We implement our own mutex type since we want them to be 'fair', we
-   want to do fancy things while waiting for them (like running
-   asyncs) and we might want to add things that are nice for
-   debugging.
-*/
-
-typedef struct {
-  scm_i_pthread_mutex_t lock;
-  SCM owner;
-  int level;      /* how much the owner owns us.  
-                    < 0 for non-recursive mutexes */
-  SCM waiting;    /* the threads waiting for this mutex. */
-} fat_mutex;
-
-#define SCM_MUTEXP(x)         SCM_SMOB_PREDICATE (scm_tc16_mutex, x)
-#define SCM_MUTEX_DATA(x)     ((fat_mutex *) SCM_SMOB_DATA (x))
+SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0,
+           (SCM obj),
+           "Return @code{#t} if @var{obj} is a thread.")
+#define FUNC_NAME s_scm_thread_p
+{
+  return SCM_I_IS_THREAD(obj) ? SCM_BOOL_T : SCM_BOOL_F;
+}
+#undef FUNC_NAME
 
 static SCM
 fat_mutex_mark (SCM mx)
@@ -1074,7 +1146,7 @@ fat_mutex_print (SCM mx, SCM port, scm_print_state *pstate SCM_UNUSED)
 }
 
 static SCM
-make_fat_mutex (int recursive)
+make_fat_mutex (int recursive, int unchecked_unlock, int external_unlock)
 {
   fat_mutex *m;
   SCM mx;
@@ -1083,18 +1155,47 @@ make_fat_mutex (int recursive)
   scm_i_pthread_mutex_init (&m->lock, NULL);
   m->owner = SCM_BOOL_F;
   m->level = recursive? 0 : -1;
+
+  m->unchecked_unlock = unchecked_unlock;
+  m->allow_external_unlock = external_unlock;
+
   m->waiting = SCM_EOL;
   SCM_NEWSMOB (mx, scm_tc16_mutex, (scm_t_bits) m);
   m->waiting = make_queue ();
   return mx;
 }
 
-SCM_DEFINE (scm_make_mutex, "make-mutex", 0, 0, 0,
-           (void),
+SCM scm_make_mutex (void)
+{
+  return scm_make_mutex_with_flags (SCM_EOL);
+}
+
+static SCM unchecked_unlock_sym;
+static SCM allow_external_unlock_sym;
+static SCM recursive_sym;
+
+SCM_DEFINE (scm_make_mutex_with_flags, "make-mutex", 0, 0, 1,
+           (SCM flags),
            "Create a new mutex. ")
-#define FUNC_NAME s_scm_make_mutex
+#define FUNC_NAME s_scm_make_mutex_with_flags
 {
-  return make_fat_mutex (0);
+  int unchecked_unlock = 0, external_unlock = 0, recursive = 0;
+
+  SCM ptr = flags;
+  while (! scm_is_null (ptr))
+    {
+      SCM flag = SCM_CAR (ptr);
+      if (scm_is_eq (flag, unchecked_unlock_sym))
+       unchecked_unlock = 1;
+      else if (scm_is_eq (flag, allow_external_unlock_sym))
+       external_unlock = 1;
+      else if (scm_is_eq (flag, recursive_sym))
+       recursive = 1;
+      else 
+       SCM_MISC_ERROR ("unsupported mutex option", SCM_EOL);
+      ptr = SCM_CDR (ptr);
+    }
+  return make_fat_mutex (recursive, unchecked_unlock, external_unlock);
 }
 #undef FUNC_NAME
 
@@ -1103,59 +1204,121 @@ SCM_DEFINE (scm_make_recursive_mutex, "make-recursive-mutex", 0, 0, 0,
            "Create a new recursive mutex. ")
 #define FUNC_NAME s_scm_make_recursive_mutex
 {
-  return make_fat_mutex (1);
+  return make_fat_mutex (1, 0, 0);
 }
 #undef FUNC_NAME
 
-static char *
-fat_mutex_lock (SCM mutex)
+SCM_SYMBOL (scm_abandoned_mutex_error_key, "abandoned-mutex-error");
+
+static SCM
+fat_mutex_lock (SCM mutex, scm_t_timespec *timeout, int *ret)
 {
   fat_mutex *m = SCM_MUTEX_DATA (mutex);
+
   SCM thread = scm_current_thread ();
-  char *msg = NULL;
+  scm_i_thread *t = SCM_I_THREAD_DATA (thread);
+
+  SCM err = SCM_BOOL_F;
+
+  struct timeval current_time;
 
   scm_i_scm_pthread_mutex_lock (&m->lock);
   if (scm_is_false (m->owner))
-    m->owner = thread;
+    {
+      m->owner = thread;
+      scm_i_pthread_mutex_lock (&t->admin_mutex);
+      t->mutexes = scm_cons (mutex, t->mutexes);
+      scm_i_pthread_mutex_unlock (&t->admin_mutex);
+      *ret = 1;
+    }
   else if (scm_is_eq (m->owner, thread))
     {
       if (m->level >= 0)
-       m->level++;
+       {
+         m->level++;
+         *ret = 1;
+       }
       else
-       msg = "mutex already locked by current thread";
+       err = scm_cons (scm_misc_error_key,
+                       scm_from_locale_string ("mutex already locked by "
+                                               "current thread"));
     }
   else
     {
+      int first_iteration = 1;
       while (1)
        {
-         block_self (m->waiting, mutex, &m->lock, NULL);
-         if (scm_is_eq (m->owner, thread))
-           break;
-         scm_i_pthread_mutex_unlock (&m->lock);
-         SCM_TICK;
-         scm_i_scm_pthread_mutex_lock (&m->lock);
+         if (scm_is_eq (m->owner, thread) || scm_c_thread_exited_p (m->owner))
+           {
+             scm_i_pthread_mutex_lock (&t->admin_mutex);
+             t->mutexes = scm_cons (mutex, t->mutexes);
+             scm_i_pthread_mutex_unlock (&t->admin_mutex);
+             *ret = 1;
+             if (scm_c_thread_exited_p (m->owner)) 
+               {
+                 m->owner = thread;
+                 err = scm_cons (scm_abandoned_mutex_error_key,
+                                 scm_from_locale_string ("lock obtained on "
+                                                         "abandoned mutex"));
+               }
+             break;
+           }
+         else if (!first_iteration)
+           {
+             if (timeout != NULL) 
+               {
+                 gettimeofday (&current_time, NULL);
+                 if (current_time.tv_sec > timeout->tv_sec ||
+                     (current_time.tv_sec == timeout->tv_sec &&
+                      current_time.tv_usec * 1000 > timeout->tv_nsec))
+                   {
+                     *ret = 0;
+                     break;
+                   }
+               }
+             scm_i_pthread_mutex_unlock (&m->lock);
+             SCM_TICK;
+             scm_i_scm_pthread_mutex_lock (&m->lock);
+           }
+         else
+           first_iteration = 0;
+         block_self (m->waiting, mutex, &m->lock, timeout);
        }
     }
   scm_i_pthread_mutex_unlock (&m->lock);
-  return msg;
+  return err;
 }
 
-SCM_DEFINE (scm_lock_mutex, "lock-mutex", 1, 0, 0,
-           (SCM mx),
+SCM scm_lock_mutex (SCM mx)
+{
+  return scm_lock_mutex_timed (mx, SCM_UNDEFINED);
+}
+
+SCM_DEFINE (scm_lock_mutex_timed, "lock-mutex", 1, 1, 0,
+           (SCM m, SCM timeout),
 "Lock @var{mutex}. If the mutex is already locked, the calling thread "
 "blocks until the mutex becomes available. The function returns when "
 "the calling thread owns the lock on @var{mutex}.  Locking a mutex that "
 "a thread already owns will succeed right away and will not block the "
 "thread.  That is, Guile's mutexes are @emph{recursive}. ")
-#define FUNC_NAME s_scm_lock_mutex
+#define FUNC_NAME s_scm_lock_mutex_timed
 {
-  char *msg;
+  SCM exception;
+  int ret = 0;
+  scm_t_timespec cwaittime, *waittime = NULL;
 
-  SCM_VALIDATE_MUTEX (1, mx);
-  msg = fat_mutex_lock (mx);
-  if (msg)
-    scm_misc_error (NULL, msg, SCM_EOL);
-  return SCM_BOOL_T;
+  SCM_VALIDATE_MUTEX (1, m);
+
+  if (! SCM_UNBNDP (timeout) && ! scm_is_false (timeout))
+    {
+      to_timespec (timeout, &cwaittime);
+      waittime = &cwaittime;
+    }
+
+  exception = fat_mutex_lock (m, waittime, &ret);
+  if (!scm_is_false (exception))
+    scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1);
+  return ret ? SCM_BOOL_T : SCM_BOOL_F;
 }
 #undef FUNC_NAME
 
@@ -1168,71 +1331,134 @@ scm_dynwind_lock_mutex (SCM mutex)
                                       SCM_F_WIND_EXPLICITLY);
 }
 
-static char *
-fat_mutex_trylock (fat_mutex *m, int *resp)
-{
-  char *msg = NULL;
-  SCM thread = scm_current_thread ();
-
-  *resp = 1;
-  scm_i_pthread_mutex_lock (&m->lock);
-  if (scm_is_false (m->owner))
-    m->owner = thread;
-  else if (scm_is_eq (m->owner, thread))
-    {
-      if (m->level >= 0)
-       m->level++;
-      else
-       msg = "mutex already locked by current thread";
-    }
-  else
-    *resp = 0;
-  scm_i_pthread_mutex_unlock (&m->lock);
-  return msg;
-}
-
 SCM_DEFINE (scm_try_mutex, "try-mutex", 1, 0, 0,
            (SCM mutex),
 "Try to lock @var{mutex}. If the mutex is already locked by someone "
 "else, return @code{#f}.  Else lock the mutex and return @code{#t}. ")
 #define FUNC_NAME s_scm_try_mutex
 {
-  char *msg;
-  int res;
+  SCM exception;
+  int ret = 0;
+  scm_t_timespec cwaittime, *waittime = NULL;
 
   SCM_VALIDATE_MUTEX (1, mutex);
+
+  to_timespec (scm_from_int(0), &cwaittime);
+  waittime = &cwaittime;
   
-  msg = fat_mutex_trylock (SCM_MUTEX_DATA (mutex), &res);
-  if (msg)
-    scm_misc_error (NULL, msg, SCM_EOL);
-  return scm_from_bool (res);
+  exception = fat_mutex_lock (mutex, waittime, &ret);
+  if (!scm_is_false (exception))
+    scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1);
+  return ret ? SCM_BOOL_T : SCM_BOOL_F;
 }
 #undef FUNC_NAME
 
-static char *
-fat_mutex_unlock (fat_mutex *m)
+/*** Fat condition variables */
+
+typedef struct {
+  scm_i_pthread_mutex_t lock;
+  SCM waiting;               /* the threads waiting for this condition. */
+} fat_cond;
+
+#define SCM_CONDVARP(x)       SCM_SMOB_PREDICATE (scm_tc16_condvar, x)
+#define SCM_CONDVAR_DATA(x)   ((fat_cond *) SCM_SMOB_DATA (x))
+
+static int
+fat_mutex_unlock (SCM mutex, SCM cond,
+                 const scm_t_timespec *waittime, int relock)
 {
-  char *msg = NULL;
+  fat_mutex *m = SCM_MUTEX_DATA (mutex);
+  fat_cond *c = NULL;
+  scm_i_thread *t = SCM_I_CURRENT_THREAD;
+  int err = 0, ret = 0;
 
   scm_i_scm_pthread_mutex_lock (&m->lock);
   if (!scm_is_eq (m->owner, scm_current_thread ()))
     {
       if (scm_is_false (m->owner))
-       msg = "mutex not locked";
-      else
-       msg = "mutex not locked by current thread";
+       {
+         if (!m->unchecked_unlock)
+           scm_misc_error (NULL, "mutex not locked", SCM_EOL);
+       }
+      else if (!m->allow_external_unlock)
+       scm_misc_error (NULL, "mutex not locked by current thread", SCM_EOL);
+    }
+
+  if (! (SCM_UNBNDP (cond)))
+    {
+      int lock_ret = 0;
+
+      c = SCM_CONDVAR_DATA (cond);
+      while (1)
+       {
+         int brk = 0;
+
+         scm_i_scm_pthread_mutex_lock (&c->lock);
+         if (m->level > 0)
+           m->level--;
+         else
+           m->owner = unblock_from_queue (m->waiting);
+         scm_i_pthread_mutex_unlock (&m->lock);
+         
+         t->block_asyncs++;
+         
+         err = block_self (c->waiting, cond, &c->lock, waittime);
+
+         if (err == 0)
+           {
+             ret = 1;
+             brk = 1;
+           }
+         else if (err == ETIMEDOUT)
+           {
+             ret = 0;
+             brk = 1;
+           }
+         else if (err != EINTR)
+           {         
+             errno = err;
+             scm_i_pthread_mutex_unlock (&c->lock);
+             scm_syserror (NULL);
+           }     
+
+         if (brk)
+           {
+             if (relock)
+               fat_mutex_lock (mutex, NULL, &lock_ret);
+             scm_i_pthread_mutex_unlock (&c->lock);
+             break;
+           }
+         
+         scm_i_pthread_mutex_unlock (&c->lock);
+
+         t->block_asyncs--;
+         scm_async_click ();
+         
+         scm_remember_upto_here_2 (cond, mutex);
+
+         scm_i_scm_pthread_mutex_lock (&m->lock);
+       }
     }
-  else if (m->level > 0)
-    m->level--;
   else
-    m->owner = unblock_from_queue (m->waiting);
-  scm_i_pthread_mutex_unlock (&m->lock);
+    {
+      if (m->level > 0)
+       m->level--;
+      else
+       m->owner = unblock_from_queue (m->waiting);
+      scm_i_pthread_mutex_unlock (&m->lock);
+      ret = 1;
+    }
+  
+  return ret;
+}
 
-  return msg;
+SCM scm_unlock_mutex (SCM mx)
+{
+  return scm_unlock_mutex_timed (mx, SCM_UNDEFINED, SCM_UNDEFINED);
 }
 
-SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0,
-           (SCM mx),
+SCM_DEFINE (scm_unlock_mutex_timed, "unlock-mutex", 1, 2, 0,
+           (SCM mx, SCM cond, SCM timeout),
 "Unlocks @var{mutex} if the calling thread owns the lock on "
 "@var{mutex}.  Calling unlock-mutex on a mutex not owned by the current "
 "thread results in undefined behaviour. Once a mutex has been unlocked, "
@@ -1240,18 +1466,35 @@ SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0,
 "lock.  Every call to @code{lock-mutex} by this thread must be matched "
 "with a call to @code{unlock-mutex}.  Only the last call to "
 "@code{unlock-mutex} will actually unlock the mutex. ")
-#define FUNC_NAME s_scm_unlock_mutex
+#define FUNC_NAME s_scm_unlock_mutex_timed
 {
-  char *msg;
+  scm_t_timespec cwaittime, *waittime = NULL;
+
   SCM_VALIDATE_MUTEX (1, mx);
-  
-  msg = fat_mutex_unlock (SCM_MUTEX_DATA (mx));
-  if (msg)
-    scm_misc_error (NULL, msg, SCM_EOL);
-  return SCM_BOOL_T;
+  if (! (SCM_UNBNDP (cond)))
+    {
+      SCM_VALIDATE_CONDVAR (2, cond);
+
+      if (! (SCM_UNBNDP (timeout)))
+       {
+         to_timespec (timeout, &cwaittime);
+         waittime = &cwaittime;
+       }
+    }
+
+  return fat_mutex_unlock (mx, cond, waittime, 0) ? SCM_BOOL_T : SCM_BOOL_F;
 }
 #undef FUNC_NAME
 
+SCM_DEFINE (scm_mutex_p, "mutex?", 1, 0, 0,
+           (SCM obj),
+           "Return @code{#t} if @var{obj} is a mutex.")
+#define FUNC_NAME s_scm_mutex_p
+{
+  return SCM_MUTEXP (obj) ? SCM_BOOL_T : SCM_BOOL_F;
+}
+#undef FUNC_NAME 
+
 #if 0
 
 SCM_DEFINE (scm_mutex_owner, "mutex-owner", 1, 0, 0,
@@ -1277,16 +1520,6 @@ SCM_DEFINE (scm_mutex_level, "mutex-level", 1, 0, 0,
 
 #endif
 
-/*** Fat condition variables */
-
-typedef struct {
-  scm_i_pthread_mutex_t lock;
-  SCM waiting;               /* the threads waiting for this condition. */
-} fat_cond;
-
-#define SCM_CONDVARP(x)       SCM_SMOB_PREDICATE (scm_tc16_condvar, x)
-#define SCM_CONDVAR_DATA(x)   ((fat_cond *) SCM_SMOB_DATA (x))
-
 static SCM
 fat_cond_mark (SCM cv)
 {
@@ -1334,43 +1567,7 @@ static int
 fat_cond_timedwait (SCM cond, SCM mutex,
                    const scm_t_timespec *waittime)
 {
-  scm_i_thread *t = SCM_I_CURRENT_THREAD;
-  fat_cond *c = SCM_CONDVAR_DATA (cond);
-  fat_mutex *m = SCM_MUTEX_DATA (mutex);
-  const char *msg;
-  int err = 0;
-
-  while (1)
-    {
-      scm_i_scm_pthread_mutex_lock (&c->lock);
-      msg = fat_mutex_unlock (m);
-      t->block_asyncs++;
-      if (msg == NULL)
-       {
-         err = block_self (c->waiting, cond, &c->lock, waittime);
-         scm_i_pthread_mutex_unlock (&c->lock);
-         fat_mutex_lock (mutex);
-       }
-      else
-       scm_i_pthread_mutex_unlock (&c->lock);
-      t->block_asyncs--;
-      scm_async_click ();
-
-      if (msg)
-       scm_misc_error (NULL, msg, SCM_EOL);
-
-      scm_remember_upto_here_2 (cond, mutex);
-
-      if (err == 0)
-       return 1;
-      if (err == ETIMEDOUT)
-       return 0;
-      if (err != EINTR)
-       {
-         errno = err;
-         scm_syserror (NULL);
-       }
-    }
+  return fat_mutex_unlock (mutex, cond, waittime, 1);
 }
 
 SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1, 0,
@@ -1393,20 +1590,11 @@ SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1,
   
   if (!SCM_UNBNDP (t))
     {
-      if (scm_is_pair (t))
-       {
-         waittime.tv_sec = scm_to_ulong (SCM_CAR (t));
-         waittime.tv_nsec = scm_to_ulong (SCM_CAR (t)) * 1000;
-       }
-      else
-       {
-         waittime.tv_sec = scm_to_ulong (t);
-         waittime.tv_nsec = 0;
-       }
+      to_timespec (t, &waittime);
       waitptr = &waittime;
     }
 
-  return scm_from_bool (fat_cond_timedwait (cv, mx, waitptr));
+  return fat_cond_timedwait (cv, mx, waitptr) ? SCM_BOOL_T : SCM_BOOL_F;
 }
 #undef FUNC_NAME
 
@@ -1449,6 +1637,15 @@ SCM_DEFINE (scm_broadcast_condition_variable, "broadcast-condition-variable", 1,
 }
 #undef FUNC_NAME
 
+SCM_DEFINE (scm_condition_variable_p, "condition-variable?", 1, 0, 0,
+           (SCM obj),
+           "Return @code{#t} if @var{obj} is a condition variable.")
+#define FUNC_NAME s_scm_condition_variable_p
+{
+  return SCM_CONDVARP(obj) ? SCM_BOOL_T : SCM_BOOL_F;
+}
+#undef FUNC_NAME
+
 /*** Marking stacks */
 
 /* XXX - what to do with this?  Do we need to handle this for blocked
@@ -1800,6 +1997,12 @@ scm_init_threads ()
   scm_set_smob_print (scm_tc16_mutex, fat_mutex_print);
   scm_set_smob_free (scm_tc16_mutex, fat_mutex_free);
 
+  unchecked_unlock_sym = 
+    scm_permanent_object (scm_from_locale_symbol ("unchecked-unlock"));
+  allow_external_unlock_sym = 
+    scm_permanent_object (scm_from_locale_symbol ("allow-external-unlock"));
+  recursive_sym = scm_permanent_object (scm_from_locale_symbol ("recursive"));
+
   scm_tc16_condvar = scm_make_smob_type ("condition-variable",
                                         sizeof (fat_cond));
   scm_set_smob_mark (scm_tc16_condvar, fat_cond_mark);
index b19fbe3..e1944a5 100644 (file)
@@ -54,6 +54,7 @@ typedef struct scm_i_thread {
   SCM join_queue;
 
   scm_i_pthread_mutex_t admin_mutex;
+  SCM mutexes;
 
   SCM result;
   int canceled;
@@ -162,13 +163,19 @@ SCM_API SCM scm_cancel_thread (SCM t);
 SCM_API SCM scm_set_thread_cleanup_x (SCM thread, SCM proc);
 SCM_API SCM scm_thread_cleanup (SCM thread);
 SCM_API SCM scm_join_thread (SCM t);
+SCM_API SCM scm_join_thread_timed (SCM t, SCM timeout, SCM timeoutval);
+SCM_API SCM scm_thread_p (SCM t);
 
 SCM_API SCM scm_make_mutex (void);
 SCM_API SCM scm_make_recursive_mutex (void);
+SCM_API SCM scm_make_mutex_with_flags (SCM flags);
 SCM_API SCM scm_lock_mutex (SCM m);
+SCM_API SCM scm_lock_mutex_timed (SCM m, SCM timeout);
 SCM_API void scm_dynwind_lock_mutex (SCM mutex);
 SCM_API SCM scm_try_mutex (SCM m);
 SCM_API SCM scm_unlock_mutex (SCM m);
+SCM_API SCM scm_unlock_mutex_timed (SCM m, SCM cond, SCM timeout);
+SCM_API SCM scm_mutex_p (SCM o);
 
 SCM_API SCM scm_make_condition_variable (void);
 SCM_API SCM scm_wait_condition_variable (SCM cond, SCM mutex);
@@ -176,6 +183,7 @@ SCM_API SCM scm_timed_wait_condition_variable (SCM cond, SCM mutex,
                                               SCM abstime);
 SCM_API SCM scm_signal_condition_variable (SCM cond);
 SCM_API SCM scm_broadcast_condition_variable (SCM cond);
+SCM_API SCM scm_condition_variable_p (SCM o);
 
 SCM_API SCM scm_current_thread (void);
 SCM_API SCM scm_all_threads (void);
index 10b1b91..62ee0cd 100644 (file)
                                '(0 1 2 3 4 5))
            (equal? result '(10 8 6 4 2 0)))))
 
+      ;;
+      ;; timed mutex locking
+      ;;
+
+      (with-test-prefix "lock-mutex"
+
+       (pass-if "timed locking fails if timeout exceeded"
+         (let ((m (make-mutex)))
+           (lock-mutex m)
+           (let ((t (begin-thread (lock-mutex m (+ (current-time) 1)))))
+             (not (join-thread t)))))
+
+        (pass-if "timed locking succeeds if mutex unlocked within timeout"
+         (let* ((m (make-mutex))
+                (c (make-condition-variable))
+                (cm (make-mutex)))
+           (lock-mutex cm)
+           (let ((t (begin-thread (begin (lock-mutex cm)
+                                         (signal-condition-variable c)
+                                         (unlock-mutex cm)
+                                         (lock-mutex m
+                                                     (+ (current-time) 2))))))
+             (lock-mutex m)
+             (wait-condition-variable c cm)
+             (unlock-mutex cm)
+             (sleep 1)
+             (unlock-mutex m)
+             (join-thread t)))))
+
+      ;;
+      ;; timed mutex unlocking
+      ;;
+
+      (with-test-prefix "unlock-mutex"
+
+        (pass-if "timed unlocking returns #f if timeout exceeded"
+          (let ((m (make-mutex))
+               (c (make-condition-variable)))
+           (lock-mutex m)
+           (not (unlock-mutex m c (current-time)))))
+
+        (pass-if "timed unlocking returns #t if condition signaled"
+         (let ((m1 (make-mutex))
+               (m2 (make-mutex))
+               (c1 (make-condition-variable))
+               (c2 (make-condition-variable)))
+           (lock-mutex m1)
+           (let ((t (begin-thread (begin (lock-mutex m1)
+                                         (signal-condition-variable c1)
+                                         (lock-mutex m2)
+                                         (unlock-mutex m1)
+                                         (unlock-mutex m2 
+                                                       c2 
+                                                       (+ (current-time) 
+                                                          2))))))
+             (wait-condition-variable c1 m1)
+             (unlock-mutex m1)
+             (lock-mutex m2)
+             (signal-condition-variable c2)
+             (unlock-mutex m2)
+             (join-thread t)))))
+
+      ;;
+      ;; timed joining
+      ;;
+
+      (with-test-prefix "join-thread"
+
+       (pass-if "timed joining fails if timeout exceeded"
+         (let* ((m (make-mutex))
+                (c (make-condition-variable))
+                (t (begin-thread (begin (lock-mutex m)
+                                        (wait-condition-variable c m))))
+                (r (join-thread t (current-time))))
+           (cancel-thread t)
+           (not r)))
+      
+        (pass-if "join-thread returns timeoutval on timeout"
+          (let* ((m (make-mutex))
+                (c (make-condition-variable))
+                (t (begin-thread (begin (lock-mutex m)
+                                        (wait-condition-variable c m))))
+                (r (join-thread t (current-time) 'foo)))
+           (cancel-thread t)
+           (eq? r 'foo)))
+           
+
+       (pass-if "timed joining succeeds if thread exits within timeout"
+          (let ((t (begin-thread (begin (sleep 1) #t))))
+           (join-thread t (+ (current-time) 2)))))
+
       ;;
       ;; thread cancellation
       ;;
              (eq? (join-thread t) 'bar))))
 
        (pass-if "initial handler is false"
-         (not (thread-cleanup (current-thread)))))))
+         (not (thread-cleanup (current-thread)))))
+
+      ;;
+      ;; mutex behavior
+      ;;
+
+      (with-test-prefix "mutex-behavior"
+
+        (pass-if "unchecked unlock"
+          (let* ((m (make-mutex 'unchecked-unlock)))
+           (unlock-mutex m)))
+
+       (pass-if "allow external unlock"
+         (let* ((m (make-mutex 'allow-external-unlock))
+                (t (begin-thread (lock-mutex m))))
+           (join-thread t)
+           (unlock-mutex m)))
+
+       (pass-if "recursive mutexes"
+         (let* ((m (make-mutex 'recursive)))
+           (lock-mutex m)
+           (lock-mutex m)))             
+
+       (pass-if "locking abandoned mutex throws exception"
+          (let* ((m (make-mutex))
+                (t (begin-thread (lock-mutex m)))
+                (success #f))
+           (join-thread t)
+           (catch 'abandoned-mutex-error
+                  (lambda () (lock-mutex m))
+                  (lambda key (set! success #t)))
+           success)))))