1 /* Copyright (C) 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2006 Free Software Foundation, Inc.
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 /* $Id: coop.c,v 1.39 2006-04-17 00:05:38 kryde Exp $ */
21 /* Cooperative thread library, based on QuickThreads */
36 #include "libguile/eval.h"
38 \f/* #define COOP_STKSIZE (0x10000) */
39 #define COOP_STKSIZE (scm_eval_stack)
41 /* `alignment' must be a power of 2. */
42 #define COOP_STKALIGN(sp, alignment) \
43 ((void *)((((qt_word_t)(sp)) + (alignment) - 1) & ~((alignment)-1)))
47 /* Queue access functions. */
50 coop_qinit (coop_q_t
*q
)
52 q
->t
.next
= q
->tail
= &q
->t
;
59 q
->t
.exceptfds
= NULL
;
65 coop_qget (coop_q_t
*q
)
74 { /* If it was already empty .. */
75 return NULL
; /* .. say so. */
77 q
->tail
= &q
->t
; /* Else now it is empty. */
84 coop_qput (coop_q_t
*q
, coop_t
*t
)
92 coop_all_qput (coop_q_t
*q
, coop_t
*t
)
95 q
->t
.all_next
->all_prev
= t
;
97 t
->all_next
= q
->t
.all_next
;
102 coop_all_qremove (coop_q_t
*q
, coop_t
*t
)
105 t
->all_prev
->all_next
= t
->all_next
;
107 q
->t
.all_next
= t
->all_next
;
109 t
->all_next
->all_prev
= t
->all_prev
;
112 /* Insert thread t into the ordered queue q.
113 q is ordered after wakeup_time. Threads which aren't sleeping but
114 waiting for I/O go last into the queue. */
116 coop_timeout_qinsert (coop_q_t
*q
, coop_t
*t
)
118 coop_t
*pred
= &q
->t
;
119 int sec
= t
->wakeup_time
.tv_sec
;
120 int usec
= t
->wakeup_time
.tv_usec
;
121 while (pred
->next
!= &q
->t
122 && pred
->next
->timeoutp
123 && (pred
->next
->wakeup_time
.tv_sec
< sec
124 || (pred
->next
->wakeup_time
.tv_sec
== sec
125 && pred
->next
->wakeup_time
.tv_usec
< usec
)))
127 t
->next
= pred
->next
;
129 if (t
->next
== &q
->t
)
135 /* Thread routines. */
137 coop_q_t coop_global_runq
; /* A queue of runable threads. */
138 coop_q_t coop_global_sleepq
; /* A queue of sleeping threads. */
139 coop_q_t coop_tmp_queue
; /* A temp working queue */
140 coop_q_t coop_global_allq
; /* A queue of all threads. */
141 static coop_t coop_global_main
; /* Thread for the process. */
142 coop_t
*coop_global_curr
; /* Currently-executing thread. */
144 #ifdef GUILE_PTHREAD_COMPAT
145 static coop_q_t coop_deadq
;
146 static int coop_quitting_p
= -1;
147 static pthread_cond_t coop_cond_quit
;
148 static pthread_cond_t coop_cond_create
;
149 static pthread_mutex_t coop_mutex_create
;
150 static pthread_t coop_mother
;
151 static int mother_awake_p
= 0;
152 static coop_t
*coop_child
;
155 static void *coop_starthelp (qt_t
*old
, void *ignore0
, void *ignore1
);
156 static void coop_only (void *pu
, void *pt
, qt_userf_t
*f
);
157 static void *coop_aborthelp (qt_t
*sp
, void *old
, void *null
);
158 static void *coop_yieldhelp (qt_t
*sp
, void *old
, void *blockq
);
161 /* called on process termination. */
167 extern int on_exit (void (*procp
) (), int arg
);
170 coop_finish (int status
, void *arg
)
172 #error Dont know how to setup a cleanup handler on your system.
176 #ifdef GUILE_PTHREAD_COMPAT
178 pthread_cond_signal (&coop_cond_create
);
179 pthread_cond_broadcast (&coop_cond_quit
);
186 coop_qinit (&coop_global_runq
);
187 coop_qinit (&coop_global_sleepq
);
188 coop_qinit (&coop_tmp_queue
);
189 coop_qinit (&coop_global_allq
);
190 coop_global_curr
= &coop_global_main
;
191 #ifdef GUILE_PTHREAD_COMPAT
192 coop_qinit (&coop_deadq
);
193 pthread_cond_init (&coop_cond_quit
, NULL
);
194 pthread_cond_init (&coop_cond_create
, NULL
);
195 pthread_mutex_init (&coop_mutex_create
, NULL
);
198 atexit (coop_finish
);
201 on_exit (coop_finish
, 0);
211 while ((next
= coop_qget (&coop_global_runq
)) != NULL
) {
212 coop_global_curr
= next
;
213 QT_BLOCK (coop_starthelp
, 0, 0, next
->sp
);
219 coop_starthelp (qt_t
*old
, void *ignore0
, void *ignore1
)
221 coop_global_main
.sp
= old
;
222 coop_global_main
.joining
= NULL
;
223 coop_qput (&coop_global_runq
, &coop_global_main
);
224 return NULL
; /* not used, but keeps compiler happy */
228 coop_mutex_init (coop_m
*m
)
230 return coop_new_mutex_init (m
, NULL
);
234 coop_new_mutex_init (coop_m
*m
, coop_mattr
*attr
)
238 coop_qinit(&(m
->waiting
));
243 coop_mutex_trylock (coop_m
*m
)
245 if (m
->owner
== NULL
)
247 m
->owner
= coop_global_curr
;
250 else if (m
->owner
== coop_global_curr
)
260 coop_mutex_lock (coop_m
*m
)
262 if (m
->owner
== NULL
)
264 m
->owner
= coop_global_curr
;
266 else if (m
->owner
== coop_global_curr
)
272 coop_t
*old
, *newthread
;
274 /* Record the current top-of-stack before going to sleep */
275 coop_global_curr
->top
= &old
;
277 newthread
= coop_wait_for_runnable_thread();
278 if (newthread
== coop_global_curr
)
280 old
= coop_global_curr
;
281 coop_global_curr
= newthread
;
282 QT_BLOCK (coop_yieldhelp
, old
, &(m
->waiting
), newthread
->sp
);
289 coop_mutex_unlock (coop_m
*m
)
291 coop_t
*old
, *newthread
;
295 newthread
= coop_qget (&(m
->waiting
));
296 if (newthread
!= NULL
)
298 /* Record the current top-of-stack before going to sleep */
299 coop_global_curr
->top
= &old
;
301 old
= coop_global_curr
;
302 coop_global_curr
= newthread
;
303 /* The new thread came into m->waiting through a lock operation.
304 It now owns this mutex. */
305 m
->owner
= coop_global_curr
;
306 QT_BLOCK (coop_yieldhelp
, old
, &coop_global_runq
, newthread
->sp
);
313 else if (m
->level
> 0)
323 coop_mutex_destroy (coop_m
*m
)
330 coop_condition_variable_init (coop_c
*c
)
332 return coop_new_condition_variable_init (c
, NULL
);
336 coop_new_condition_variable_init (coop_c
*c
, coop_cattr
*a
)
338 coop_qinit(&(c
->waiting
));
343 coop_condition_variable_wait_mutex (coop_c
*c
, coop_m
*m
)
345 coop_t
*old
, *newthread
;
347 /* coop_mutex_unlock (m); */
348 newthread
= coop_qget (&(m
->waiting
));
349 if (newthread
!= NULL
)
351 m
->owner
= newthread
;
356 /*fixme* Should we really wait here? Isn't it OK just to proceed? */
357 newthread
= coop_wait_for_runnable_thread();
358 if (newthread
== coop_global_curr
)
361 coop_global_curr
->top
= &old
;
362 old
= coop_global_curr
;
363 coop_global_curr
= newthread
;
364 QT_BLOCK (coop_yieldhelp
, old
, &(c
->waiting
), newthread
->sp
);
371 coop_condition_variable_timed_wait_mutex (coop_c
*c
,
373 const scm_t_timespec
*abstime
)
378 #elif defined (WSAETIMEDOUT)
379 int res
= WSAETIMEDOUT
;
384 /* coop_mutex_unlock (m); */
385 t
= coop_qget (&(m
->waiting
));
393 coop_global_curr
->timeoutp
= 1;
394 coop_global_curr
->wakeup_time
.tv_sec
= abstime
->tv_sec
;
395 coop_global_curr
->wakeup_time
.tv_usec
= abstime
->tv_nsec
/ 1000;
396 coop_timeout_qinsert (&coop_global_sleepq
, coop_global_curr
);
397 t
= coop_wait_for_runnable_thread();
399 if (t
!= coop_global_curr
)
401 coop_global_curr
->top
= &old
;
402 old
= coop_global_curr
;
403 coop_global_curr
= t
;
404 QT_BLOCK (coop_yieldhelp
, old
, &(c
->waiting
), t
->sp
);
406 /* Are we still in the sleep queue? */
407 old
= &coop_global_sleepq
.t
;
408 for (t
= old
->next
; t
!= &coop_global_sleepq
.t
; old
= t
, t
= t
->next
)
409 if (t
== coop_global_curr
)
411 old
->next
= t
->next
; /* unlink */
421 coop_condition_variable_broadcast (coop_c
*c
)
425 while ((newthread
= coop_qget (&(c
->waiting
))) != NULL
)
427 coop_qput (&coop_global_runq
, newthread
);
433 coop_condition_variable_signal (coop_c
*c
)
435 return coop_condition_variable_broadcast (c
);
442 static int n_keys
= 0;
443 static int max_keys
= 0;
444 static void (**destructors
) (void *) = 0;
447 coop_key_create (coop_k
*keyp
, void (*destructor
) (void *value
))
449 if (n_keys
>= max_keys
)
452 max_keys
= max_keys
? max_keys
* 3 / 2 : 10;
453 destructors
= realloc (destructors
, sizeof (void *) * max_keys
);
454 if (destructors
== 0)
456 fprintf (stderr
, "Virtual memory exceeded in coop_key_create\n");
459 for (i
= n_keys
; i
< max_keys
; ++i
)
460 destructors
[i
] = NULL
;
462 destructors
[n_keys
] = destructor
;
468 coop_setspecific (coop_k key
, const void *value
)
470 int n_keys
= coop_global_curr
->n_keys
;
474 coop_global_curr
->n_keys
= max_keys
;
475 coop_global_curr
->specific
= realloc (n_keys
476 ? coop_global_curr
->specific
478 sizeof (void *) * max_keys
);
479 if (coop_global_curr
->specific
== 0)
481 fprintf (stderr
, "Virtual memory exceeded in coop_setspecific\n");
484 for (i
= n_keys
; i
< max_keys
; ++i
)
485 coop_global_curr
->specific
[i
] = NULL
;
487 coop_global_curr
->specific
[key
] = (void *) value
;
492 coop_getspecific (coop_k key
)
494 return (key
< coop_global_curr
->n_keys
495 ? coop_global_curr
->specific
[key
]
500 coop_key_delete (coop_k key
)
507 coop_condition_variable_destroy (coop_c
*c
)
512 #ifdef GUILE_PTHREAD_COMPAT
514 #include "libguile/boehm-gc.h"
516 /* 1K room for the cond wait routine */
517 #if SCM_STACK_GROWS_UP
518 # define COOP_STACK_ROOM (256)
520 # define COOP_STACK_ROOM (-256)
524 dummy_start (void *coop_thread
)
526 coop_t
*t
= (coop_t
*) coop_thread
;
528 t
->sp
= (qt_t
*) (&t
+ COOP_STACK_ROOM
);
529 pthread_mutex_init (&t
->dummy_mutex
, NULL
);
530 pthread_mutex_lock (&t
->dummy_mutex
);
533 res
= pthread_cond_wait (&coop_cond_quit
, &t
->dummy_mutex
);
534 while (res
== EINTR
);
541 pthread_mutex_lock (&coop_mutex_create
);
542 while (!coop_quitting_p
)
545 pthread_create (&coop_child
->dummy_thread
,
551 res
= pthread_cond_wait (&coop_cond_create
, &coop_mutex_create
);
552 while (res
== EINTR
);
560 coop_create (coop_userf_t
*f
, void *pu
)
563 #ifndef GUILE_PTHREAD_COMPAT
567 #ifdef GUILE_PTHREAD_COMPAT
568 t
= coop_qget (&coop_deadq
);
578 t
= scm_malloc (sizeof (coop_t
));
581 #ifdef GUILE_PTHREAD_COMPAT
584 if (coop_quitting_p
< 0)
587 /* We can't create threads ourselves since the pthread
588 * corresponding to this stack might be sleeping.
590 pthread_create (&coop_mother
, NULL
, mother
, NULL
);
594 pthread_cond_signal (&coop_cond_create
);
596 /* We can't use a pthreads condition variable since "this"
597 * pthread could already be asleep. We can't use a COOP
598 * condition variable because they are not safe against
599 * pre-emptive switching.
601 while (coop_child
|| mother_awake_p
)
604 t
->sto
= scm_malloc (COOP_STKSIZE
);
605 sto
= COOP_STKALIGN (t
->sto
, QT_STKALIGN
);
606 t
->sp
= QT_SP (sto
, COOP_STKSIZE
- QT_STKALIGN
);
610 t
->sp
= QT_ARGS (t
->sp
, pu
, t
, (qt_userf_t
*)f
, coop_only
);
612 coop_qput (&coop_global_runq
, t
);
613 coop_all_qput (&coop_global_allq
, t
);
620 coop_only (void *pu
, void *pt
, qt_userf_t
*f
)
622 coop_global_curr
= (coop_t
*)pt
;
623 (*(coop_userf_t
*)f
)(pu
);
632 coop_t
*old
, *newthread
;
634 /* Wake up any threads that are waiting to join this one */
635 if (coop_global_curr
->joining
)
637 while ((newthread
= coop_qget ((coop_q_t
*)(coop_global_curr
->joining
)))
640 coop_qput (&coop_global_runq
, newthread
);
642 free (coop_global_curr
->joining
);
647 newthread
= coop_wait_for_runnable_thread();
648 } while (newthread
== coop_global_curr
);
650 coop_all_qremove (&coop_global_allq
, coop_global_curr
);
651 old
= coop_global_curr
;
652 coop_global_curr
= newthread
;
653 QT_ABORT (coop_aborthelp
, old
, (void *) NULL
, newthread
->sp
);
658 coop_aborthelp (qt_t
*sp
, void *old
, void *null
)
660 coop_t
*oldthread
= (coop_t
*) old
;
662 if (oldthread
->specific
)
663 free (oldthread
->specific
);
664 #ifndef GUILE_PTHREAD_COMPAT
665 free (oldthread
->sto
);
668 coop_qput (&coop_deadq
, oldthread
);
678 coop_t
*old
, *newthread
;
680 /* Create a join list if necessary */
681 if (t
->joining
== NULL
)
683 t
->joining
= scm_malloc(sizeof(coop_q_t
));
684 coop_qinit((coop_q_t
*) t
->joining
);
687 newthread
= coop_wait_for_runnable_thread();
688 if (newthread
== coop_global_curr
)
690 old
= coop_global_curr
;
691 coop_global_curr
= newthread
;
692 QT_BLOCK (coop_yieldhelp
, old
, (coop_q_t
*) t
->joining
, newthread
->sp
);
701 newthread
= coop_next_runnable_thread();
703 /* There may be no other runnable threads. Return if this is the
705 if (newthread
== coop_global_curr
)
708 old
= coop_global_curr
;
710 coop_global_curr
= newthread
;
711 QT_BLOCK (coop_yieldhelp
, old
, &coop_global_runq
, newthread
->sp
);
716 coop_yieldhelp (qt_t
*sp
, void *old
, void *blockq
)
718 ((coop_t
*)old
)->sp
= sp
;
719 coop_qput ((coop_q_t
*)blockq
, (coop_t
*)old
);
723 /* Replacement for the system's sleep() function. Does the right thing
724 for the process - but not for the system (it busy-waits) */
727 coop_sleephelp (qt_t
*sp
, void *old
, void *blockq
)
729 ((coop_t
*)old
)->sp
= sp
;
730 /* old is already on the sleep queue - so there's no need to
731 do anything extra here */
736 scm_thread_usleep (unsigned long usec
)
738 struct timeval timeout
;
740 timeout
.tv_usec
= usec
;
741 scm_internal_select (0, NULL
, NULL
, NULL
, &timeout
);
742 return 0; /* Maybe we should calculate actual time slept,
743 but this is faster... :) */
747 scm_thread_sleep (unsigned long sec
)
749 time_t now
= time (NULL
);
750 struct timeval timeout
;
752 timeout
.tv_sec
= sec
;
754 scm_internal_select (0, NULL
, NULL
, NULL
, &timeout
);
755 slept
= time (NULL
) - now
;
756 return slept
> sec
? 0 : sec
- slept
;