X-Git-Url: http://git.hcoop.net/bpt/guile.git/blobdiff_plain/82892beda5c053715bc3ec7063af4a129f52c5f9..9bc6fb0a7d91ae9a6c57cedb76022043db413ba5:/libguile/coop.c diff --git a/libguile/coop.c b/libguile/coop.c index a03ba08ea..ca057b423 100644 --- a/libguile/coop.c +++ b/libguile/coop.c @@ -1,4 +1,4 @@ -/* Copyright (C) 1995, 1996 Free Software Foundation, Inc. +/* Copyright (C) 1995, 1996, 1997, 1998, 1999, 2000, 2001 Free Software Foundation, Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -40,13 +40,23 @@ * If you do not wish that, delete this exception notice. */ -/* $Id: coop.c,v 1.2 1997-05-26 22:31:48 jimb Exp $ */ +/* $Id: coop.c,v 1.29 2001-11-04 15:52:29 ela Exp $ */ /* Cooperative thread library, based on QuickThreads */ -#include +#include -#define COOP_STKSIZE (0x10000) +#ifdef HAVE_UNISTD_H +#include +#endif + +#include + +#include "qt/qt.h" +#include "libguile/eval.h" + + /* #define COOP_STKSIZE (0x10000) */ +#define COOP_STKSIZE (scm_eval_stack) /* `alignment' must be a power of 2. */ #define COOP_STKALIGN(sp, alignment) \ @@ -56,69 +66,52 @@ /* Queue access functions. */ -#ifdef __STDC__ static void coop_qinit (coop_q_t *q) -#else -static void -coop_qinit (q) - coop_q_t *q; -#endif { q->t.next = q->tail = &q->t; q->t.all_prev = NULL; q->t.all_next = NULL; +#ifdef GUILE_ISELECT + q->t.nfds = 0; + q->t.readfds = NULL; + q->t.writefds = NULL; + q->t.exceptfds = NULL; + q->t.timeoutp = 0; +#endif } -#ifdef __STDC__ -static coop_t * +coop_t * coop_qget (coop_q_t *q) -#else -static coop_t * -coop_qget (q) - coop_q_t *q; -#endif { coop_t *t; t = q->t.next; q->t.next = t->next; - if (t->next == &q->t) { - if (t == &q->t) { /* If it was already empty .. */ - return (NULL); /* .. say so. */ + if (t->next == &q->t) + { + if (t == &q->t) + { /* If it was already empty .. */ + return NULL; /* .. say so. */ + } + q->tail = &q->t; /* Else now it is empty. */ } - q->tail = &q->t; /* Else now it is empty. */ - } return (t); } -#ifdef __STDC__ -static void +void coop_qput (coop_q_t *q, coop_t *t) -#else -static void -coop_qput (q, t) - coop_q_t *q; - coop_t *t; -#endif { q->tail->next = t; t->next = &q->t; q->tail = t; } -#ifdef __STDC__ static void coop_all_qput (coop_q_t *q, coop_t *t) -#else -static void -coop_all_qput (q, t) - coop_q_t *q; - coop_t *t; -#endif { if (q->t.all_next) q->t.all_next->all_prev = t; @@ -127,15 +120,8 @@ coop_all_qput (q, t) q->t.all_next = t; } -#ifdef __STDC__ static void coop_all_qremove (coop_q_t *q, coop_t *t) -#else -static void -coop_all_qremove (q, t) - coop_q_t *q; - coop_t *t; -#endif { if (t->all_prev) t->all_prev->all_next = t->all_next; @@ -145,15 +131,49 @@ coop_all_qremove (q, t) t->all_next->all_prev = t->all_prev; } +#ifdef GUILE_ISELECT +/* Insert thread t into the ordered queue q. + q is ordered after wakeup_time. Threads which aren't sleeping but + waiting for I/O go last into the queue. */ +void +coop_timeout_qinsert (coop_q_t *q, coop_t *t) +{ + coop_t *pred = &q->t; + int sec = t->wakeup_time.tv_sec; + int usec = t->wakeup_time.tv_usec; + while (pred->next != &q->t + && pred->next->timeoutp + && (pred->next->wakeup_time.tv_sec < sec + || (pred->next->wakeup_time.tv_sec == sec + && pred->next->wakeup_time.tv_usec < usec))) + pred = pred->next; + t->next = pred->next; + pred->next = t; + if (t->next == &q->t) + q->tail = t; +} +#endif + /* Thread routines. */ -coop_q_t coop_global_runq; /* A queue of runable threads. */ -coop_q_t coop_global_sleepq; /* A queue of sleeping threads. */ -static coop_q_t tmp_queue; /* A temp working queue */ -coop_q_t coop_global_allq; /* A queue of all threads. */ -static coop_t coop_global_main; /* Thread for the process. */ -coop_t *coop_global_curr; /* Currently-executing thread. */ +coop_q_t coop_global_runq; /* A queue of runable threads. */ +coop_q_t coop_global_sleepq; /* A queue of sleeping threads. */ +coop_q_t coop_tmp_queue; /* A temp working queue */ +coop_q_t coop_global_allq; /* A queue of all threads. */ +static coop_t coop_global_main; /* Thread for the process. */ +coop_t *coop_global_curr; /* Currently-executing thread. */ + +#ifdef GUILE_PTHREAD_COMPAT +static coop_q_t coop_deadq; +static int coop_quitting_p = -1; +static pthread_cond_t coop_cond_quit; +static pthread_cond_t coop_cond_create; +static pthread_mutex_t coop_mutex_create; +static pthread_t coop_mother; +static int mother_awake_p = 0; +static coop_t *coop_child; +#endif static void *coop_starthelp (qt_t *old, void *ignore0, void *ignore1); static void coop_only (void *pu, void *pt, qt_userf_t *f); @@ -161,33 +181,58 @@ static void *coop_aborthelp (qt_t *sp, void *old, void *null); static void *coop_yieldhelp (qt_t *sp, void *old, void *blockq); -#ifdef __STDC__ -void -coop_init() +/* called on process termination. */ +#ifdef HAVE_ATEXIT +static void +coop_finish (void) #else -void -coop_init() +#ifdef HAVE_ON_EXIT +extern int on_exit (void (*procp) (), int arg); + +static void +coop_finish (int status, void *arg) +#else +#error Dont know how to setup a cleanup handler on your system. +#endif +#endif +{ +#ifdef GUILE_PTHREAD_COMPAT + coop_quitting_p = 1; + pthread_cond_signal (&coop_cond_create); + pthread_cond_broadcast (&coop_cond_quit); #endif +} + +void +coop_init () { coop_qinit (&coop_global_runq); coop_qinit (&coop_global_sleepq); - coop_qinit (&tmp_queue); + coop_qinit (&coop_tmp_queue); coop_qinit (&coop_global_allq); coop_global_curr = &coop_global_main; +#ifdef GUILE_PTHREAD_COMPAT + coop_qinit (&coop_deadq); + pthread_cond_init (&coop_cond_quit, NULL); + pthread_cond_init (&coop_cond_create, NULL); + pthread_mutex_init (&coop_mutex_create, NULL); +#endif +#ifdef HAVE_ATEXIT + atexit (coop_finish); +#else +#ifdef HAVE_ON_EXIT + on_exit (coop_finish, 0); +#endif +#endif } - /* Return the next runnable thread. If no threads are currently runnable, and there are sleeping threads - wait until one wakes up. Otherwise, return NULL. */ -#ifdef __STDC__ +#ifndef GUILE_ISELECT coop_t * coop_next_runnable_thread() -#else -coop_t * -coop_next_runnable_thread() -#endif { int sleepers; coop_t *t; @@ -204,9 +249,9 @@ coop_next_runnable_thread() if (t->wakeup_time <= now) coop_qput(&coop_global_runq, t); else - coop_qput(&tmp_queue, t); + coop_qput(&coop_tmp_queue, t); } - while ((t = coop_qget(&tmp_queue)) != NULL) + while ((t = coop_qget(&coop_tmp_queue)) != NULL) coop_qput(&coop_global_sleepq, t); t = coop_qget (&coop_global_runq); @@ -215,15 +260,10 @@ coop_next_runnable_thread() return t; } +#endif - -#ifdef __STDC__ void coop_start() -#else -void -coop_start() -#endif { coop_t *next; @@ -234,16 +274,8 @@ coop_start() } -#ifdef __STDC__ static void * coop_starthelp (qt_t *old, void *ignore0, void *ignore1) -#else -static void * -coop_starthelp (old, ignore0, ignore1) - qt_t *old; - void *ignore0; - void *ignore1; -#endif { coop_global_main.sp = old; coop_global_main.joining = NULL; @@ -251,27 +283,34 @@ coop_starthelp (old, ignore0, ignore1) return NULL; /* not used, but keeps compiler happy */ } -#ifdef __STDC__ -void +int coop_mutex_init (coop_m *m) -#else -void -coop_mutex_init (m) - coop_m *m; -#endif +{ + return coop_new_mutex_init (m, NULL); +} + +int +coop_new_mutex_init (coop_m *m, coop_mattr *attr) { m->owner = NULL; coop_qinit(&(m->waiting)); + return 0; } -#ifdef __STDC__ -void +int +coop_mutex_trylock (coop_m *m) +{ + if (m->owner == NULL) + { + m->owner = coop_global_curr; + return 0; + } + else + return EBUSY; +} + +int coop_mutex_lock (coop_m *m) -#else -void -coop_mutex_lock () - coop_m *m; -#endif { if (m->owner == NULL) { @@ -284,22 +323,23 @@ coop_mutex_lock () /* Record the current top-of-stack before going to sleep */ coop_global_curr->top = &old; +#ifdef GUILE_ISELECT + newthread = coop_wait_for_runnable_thread(); + if (newthread == coop_global_curr) + coop_abort (); +#else newthread = coop_next_runnable_thread(); +#endif old = coop_global_curr; coop_global_curr = newthread; QT_BLOCK (coop_yieldhelp, old, &(m->waiting), newthread->sp); } + return 0; } -#ifdef __STDC__ -void +int coop_mutex_unlock (coop_m *m) -#else -void -coop_mutex_unlock (m) - coop_m *m; -#endif { coop_t *old, *newthread; @@ -311,6 +351,8 @@ coop_mutex_unlock (m) old = coop_global_curr; coop_global_curr = newthread; + /* The new thread came into m->waiting through a lock operation. + It now owns this mutex. */ m->owner = coop_global_curr; QT_BLOCK (coop_yieldhelp, old, &coop_global_runq, newthread->sp); } @@ -318,46 +360,119 @@ coop_mutex_unlock (m) { m->owner = NULL; } + return 0; } -#ifdef __STDC__ -void +int +coop_mutex_destroy (coop_m *m) +{ + return 0; +} + + +int coop_condition_variable_init (coop_c *c) -#else -void -coop_condition_variable_init (c) - coop_c *c; -#endif +{ + return coop_new_condition_variable_init (c, NULL); +} + +int +coop_new_condition_variable_init (coop_c *c, coop_cattr *a) { coop_qinit(&(c->waiting)); + return 0; } -#ifdef __STDC__ -void -coop_condition_variable_wait (coop_c *c) -#else -void -coop_condition_variable_wait (c) - coop_c *c; -#endif +int +coop_condition_variable_wait_mutex (coop_c *c, coop_m *m) { coop_t *old, *newthread; - newthread = coop_next_runnable_thread(); + /* coop_mutex_unlock (m); */ + newthread = coop_qget (&(m->waiting)); + if (newthread != NULL) + { + m->owner = newthread; + } + else + { + m->owner = NULL; + /*fixme* Should we really wait here? Isn't it OK just to proceed? */ +#ifdef GUILE_ISELECT + newthread = coop_wait_for_runnable_thread(); + if (newthread == coop_global_curr) + coop_abort (); +#else + newthread = coop_next_runnable_thread(); +#endif + } + coop_global_curr->top = &old; old = coop_global_curr; coop_global_curr = newthread; QT_BLOCK (coop_yieldhelp, old, &(c->waiting), newthread->sp); + + coop_mutex_lock (m); + return 0; } -#ifdef __STDC__ -void -coop_condition_variable_signal (coop_c *c) +int +coop_condition_variable_timed_wait_mutex (coop_c *c, + coop_m *m, + const struct timespec *abstime) +{ + coop_t *old, *t; +#ifdef ETIMEDOUT + int res = ETIMEDOUT; +#elif defined (WSAETIMEDOUT) + int res = WSAETIMEDOUT; #else -void -coop_condition_variable_signal (c) - coop_c *c; + int res = 0; +#endif + + /* coop_mutex_unlock (m); */ + t = coop_qget (&(m->waiting)); + if (t != NULL) + { + m->owner = t; + } + else + { + m->owner = NULL; +#ifdef GUILE_ISELECT + coop_global_curr->timeoutp = 1; + coop_global_curr->wakeup_time.tv_sec = abstime->tv_sec; + coop_global_curr->wakeup_time.tv_usec = abstime->tv_nsec / 1000; + coop_timeout_qinsert (&coop_global_sleepq, coop_global_curr); + t = coop_wait_for_runnable_thread(); +#else + /*fixme* Implement!*/ + t = coop_next_runnable_thread(); #endif + } + if (t != coop_global_curr) + { + coop_global_curr->top = &old; + old = coop_global_curr; + coop_global_curr = t; + QT_BLOCK (coop_yieldhelp, old, &(c->waiting), t->sp); + + /* Are we still in the sleep queue? */ + old = &coop_global_sleepq.t; + for (t = old->next; t != &coop_global_sleepq.t; old = t, t = t->next) + if (t == coop_global_curr) + { + old->next = t->next; /* unlink */ + res = 0; + break; + } + } + coop_mutex_lock (m); + return res; +} + +int +coop_condition_variable_signal (coop_c *c) { coop_t *newthread; @@ -365,29 +480,179 @@ coop_condition_variable_signal (c) { coop_qput (&coop_global_runq, newthread); } + return 0; } +/* {Keys} + */ -#ifdef __STDC__ -coop_t * -coop_create (coop_userf_t *f, void *pu) +static int n_keys = 0; +static int max_keys = 0; +static void (**destructors) (void *) = 0; + +int +coop_key_create (coop_k *keyp, void (*destructor) (void *value)) +{ + if (n_keys >= max_keys) + { + int i; + max_keys = max_keys ? max_keys * 3 / 2 : 10; + destructors = realloc (destructors, sizeof (void *) * max_keys); + if (destructors == 0) + { + fprintf (stderr, "Virtual memory exceeded in coop_key_create\n"); + exit (1); + } + for (i = n_keys; i < max_keys; ++i) + destructors[i] = NULL; + } + destructors[n_keys] = destructor; + *keyp = n_keys++; + return 0; +} + +int +coop_setspecific (coop_k key, const void *value) +{ + int n_keys = coop_global_curr->n_keys; + if (key >= n_keys) + { + int i; + coop_global_curr->n_keys = max_keys; + coop_global_curr->specific = realloc (n_keys + ? coop_global_curr->specific + : NULL, + sizeof (void *) * max_keys); + if (coop_global_curr->specific == 0) + { + fprintf (stderr, "Virtual memory exceeded in coop_setspecific\n"); + exit (1); + } + for (i = n_keys; i < max_keys; ++i) + coop_global_curr->specific[i] = NULL; + } + coop_global_curr->specific[key] = (void *) value; + return 0; +} + +void * +coop_getspecific (coop_k key) +{ + return (key < coop_global_curr->n_keys + ? coop_global_curr->specific[key] + : NULL); +} + +int +coop_key_delete (coop_k key) +{ + return 0; +} + + +int +coop_condition_variable_destroy (coop_c *c) +{ + return 0; +} + +#ifdef GUILE_PTHREAD_COMPAT + +/* 1K room for the cond wait routine */ +#ifdef SCM_STACK_GROWS_UP +#define COOP_STACK_ROOM (256) #else -coop_t * -coop_create (f, pu) - coop_userf_t *f; - void *pu; +#define COOP_STACK_ROOM (-256) #endif + +static void * +dummy_start (void *coop_thread) +{ + coop_t *t = (coop_t *) coop_thread; + int res; + t->sp = (qt_t *) (&t + COOP_STACK_ROOM); + pthread_mutex_init (&t->dummy_mutex, NULL); + pthread_mutex_lock (&t->dummy_mutex); + coop_child = 0; + do + res = pthread_cond_wait (&coop_cond_quit, &t->dummy_mutex); + while (res == EINTR); + return 0; +} + +static void * +mother (void *dummy) +{ + pthread_mutex_lock (&coop_mutex_create); + while (!coop_quitting_p) + { + int res; + pthread_create (&coop_child->dummy_thread, + NULL, + dummy_start, + coop_child); + mother_awake_p = 0; + do + res = pthread_cond_wait (&coop_cond_create, &coop_mutex_create); + while (res == EINTR); + } + return 0; +} + +#endif + +coop_t * +coop_create (coop_userf_t *f, void *pu) { coop_t *t; +#ifndef GUILE_PTHREAD_COMPAT void *sto; +#endif - t = malloc (sizeof(coop_t)); - - t->data = NULL; - t->sto = malloc (COOP_STKSIZE); - sto = COOP_STKALIGN (t->sto, QT_STKALIGN); - t->sp = QT_SP (sto, COOP_STKSIZE - QT_STKALIGN); - t->base = t->sp; +#ifdef GUILE_PTHREAD_COMPAT + t = coop_qget (&coop_deadq); + if (t) + { + t->sp = t->base; + t->specific = 0; + t->n_keys = 0; + } + else +#endif + { + t = malloc (sizeof (coop_t)); + + t->specific = NULL; + t->n_keys = 0; +#ifdef GUILE_PTHREAD_COMPAT + coop_child = t; + mother_awake_p = 1; + if (coop_quitting_p < 0) + { + coop_quitting_p = 0; + /* We can't create threads ourselves since the pthread + * corresponding to this stack might be sleeping. + */ + pthread_create (&coop_mother, NULL, mother, NULL); + } + else + { + pthread_cond_signal (&coop_cond_create); + } + /* We can't use a pthreads condition variable since "this" + * pthread could already be asleep. We can't use a COOP + * condition variable because they are not safe against + * pre-emptive switching. + */ + while (coop_child || mother_awake_p) + usleep (0); +#else + t->sto = malloc (COOP_STKSIZE); + sto = COOP_STKALIGN (t->sto, QT_STKALIGN); + t->sp = QT_SP (sto, COOP_STKSIZE - QT_STKALIGN); +#endif + t->base = t->sp; + } t->sp = QT_ARGS (t->sp, pu, t, (qt_userf_t *)f, coop_only); t->joining = NULL; coop_qput (&coop_global_runq, t); @@ -397,16 +662,8 @@ coop_create (f, pu) } -#ifdef __STDC__ static void coop_only (void *pu, void *pt, qt_userf_t *f) -#else -static void -coop_only (pu. pt, f) - void *pu, - void *pt, - qt_userf_t *f; -#endif { coop_global_curr = (coop_t *)pt; (*(coop_userf_t *)f)(pu); @@ -415,13 +672,8 @@ coop_only (pu. pt, f) } -#ifdef __STDC__ -void -coop_abort () -#else void coop_abort () -#endif { coop_t *old, *newthread; @@ -433,56 +685,48 @@ coop_abort () { coop_qput (&coop_global_runq, newthread); } - free(coop_global_curr->joining); + free (coop_global_curr->joining); } +#ifdef GUILE_ISELECT + scm_I_am_dead = 1; + do { + newthread = coop_wait_for_runnable_thread(); + } while (newthread == coop_global_curr); + scm_I_am_dead = 0; +#else newthread = coop_next_runnable_thread(); - coop_all_qremove(&coop_global_allq, coop_global_curr); +#endif + coop_all_qremove (&coop_global_allq, coop_global_curr); old = coop_global_curr; coop_global_curr = newthread; - QT_ABORT (coop_aborthelp, old, (void *)NULL, newthread->sp); + QT_ABORT (coop_aborthelp, old, (void *) NULL, newthread->sp); } -#ifdef __STDC__ static void * coop_aborthelp (qt_t *sp, void *old, void *null) -#else -static void * -coop_aborthelp (sp, old, null) - qt_t *sp; - void *old; - void *null; -#endif { coop_t *oldthread = (coop_t *) old; + if (oldthread->specific) + free (oldthread->specific); +#ifndef GUILE_PTHREAD_COMPAT free (oldthread->sto); - - /* "old" is freed in scm_threads_thread_die(). - Marking old->base NULL indicates that this thread is dead */ - - oldthread->base = NULL; - + free (oldthread); +#else + coop_qput (&coop_deadq, oldthread); +#endif + return NULL; } -#ifdef __STDC__ void coop_join(coop_t *t) -#else -void -coop_join() - coop_t *t; -#endif { coop_t *old, *newthread; - /* Check if t is already finished */ - if (t->base == NULL) - return; - /* Create a join list if necessary */ if (t->joining == NULL) { @@ -490,19 +734,20 @@ coop_join() coop_qinit((coop_q_t *) t->joining); } +#ifdef GUILE_ISELECT + newthread = coop_wait_for_runnable_thread(); + if (newthread == coop_global_curr) + return; +#else newthread = coop_next_runnable_thread(); +#endif old = coop_global_curr; coop_global_curr = newthread; QT_BLOCK (coop_yieldhelp, old, (coop_q_t *) t->joining, newthread->sp); } -#ifdef __STDC__ -void -coop_yield() -#else void coop_yield() -#endif { coop_t *old = NULL; coop_t *newthread; @@ -511,8 +756,13 @@ coop_yield() /* There may be no other runnable threads. Return if this is the case. */ +#if GUILE_ISELECT + if (newthread == coop_global_curr) + return; +#else if (newthread == NULL) return; +#endif old = coop_global_curr; @@ -521,16 +771,8 @@ coop_yield() } -#ifdef __STDC__ static void * coop_yieldhelp (qt_t *sp, void *old, void *blockq) -#else -static void * -coop_yieldhelp (sp, old, blockq) - qt_t *sp; - void *old; - void *blockq; -#endif { ((coop_t *)old)->sp = sp; coop_qput ((coop_q_t *)blockq, (coop_t *)old); @@ -540,16 +782,8 @@ coop_yieldhelp (sp, old, blockq) /* Replacement for the system's sleep() function. Does the right thing for the process - but not for the system (it busy-waits) */ -#ifdef __STDC__ -static void * +void * coop_sleephelp (qt_t *sp, void *old, void *blockq) -#else -static void * -coop_sleephelp (sp, old, bolckq) - qt_t *sp; - void *old; - void *blockq; -#endif { ((coop_t *)old)->sp = sp; /* old is already on the sleep queue - so there's no need to @@ -557,17 +791,39 @@ coop_sleephelp (sp, old, bolckq) return NULL; } -#ifdef __STDC__ -unsigned -sleep (unsigned s) -#else -unsigned -sleep (s) - unsigned s; -#endif +#ifdef GUILE_ISELECT + +unsigned long +scm_thread_usleep (unsigned long usec) +{ + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = usec; + scm_internal_select (0, NULL, NULL, NULL, &timeout); + return 0; /* Maybe we should calculate actual time slept, + but this is faster... :) */ +} + +unsigned long +scm_thread_sleep (unsigned long sec) +{ + time_t now = time (NULL); + struct timeval timeout; + unsigned long slept; + timeout.tv_sec = sec; + timeout.tv_usec = 0; + scm_internal_select (0, NULL, NULL, NULL, &timeout); + slept = time (NULL) - now; + return slept > sec ? 0 : sec - slept; +} + +#else /* GUILE_ISELECT */ + +unsigned long +scm_thread_sleep (unsigned long s) { coop_t *newthread, *old; - time_t now = time(NULL); + time_t now = time (NULL); coop_global_curr->wakeup_time = now + s; /* Put the current thread on the sleep queue */ @@ -586,3 +842,20 @@ sleep (s) return s; } + +unsigned long +scm_thread_usleep (unsigned long usec) +{ + /* We're so cheap. */ + scm_thread_sleep (usec / 1000000); + return 0; /* Maybe we should calculate actual time slept, + but this is faster... :) */ +} + +#endif /* GUILE_ISELECT */ + +/* + Local Variables: + c-file-style: "gnu" + End: +*/