2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
17 #include <afs/afsutil.h>
19 #include <afs/afsint.h>
21 #define __AFS_THREAD_POOL_IMPL 1
22 #include "work_queue.h"
23 #include "thread_pool.h"
24 #include "thread_pool_impl.h"
27 * public interfaces for thread_pool.
31 * allocate a thread pool object.
33 * @param[inout] pool_out address in which to store pool object pointer
35 * @return operation status
37 * @retval ENOMEM out of memory
42 _afs_tp_alloc(struct afs_thread_pool
** pool_out
)
45 struct afs_thread_pool
* pool
;
47 *pool_out
= pool
= malloc(sizeof(*pool
));
58 * free a thread pool object.
60 * @param[in] pool thread pool object
62 * @return operation status
68 _afs_tp_free(struct afs_thread_pool
* pool
)
78 * allocate a thread worker object.
80 * @param[inout] worker_out address in which to store worker object pointer
82 * @return operation status
84 * @retval ENOMEM out of memory
89 _afs_tp_worker_alloc(struct afs_thread_pool_worker
** worker_out
)
92 struct afs_thread_pool_worker
* worker
;
94 *worker_out
= worker
= malloc(sizeof(*worker
));
100 queue_NodeInit(&worker
->worker_list
);
107 * free a thread worker object.
109 * @param[in] worker thread worker object
111 * @return operation status
117 _afs_tp_worker_free(struct afs_thread_pool_worker
* worker
)
127 * low-level thread entry point.
129 * @param[in] rock opaque pointer to thread worker object
131 * @return opaque return pointer from pool entry function
136 _afs_tp_worker_run(void * rock
)
138 struct afs_thread_pool_worker
* worker
= rock
;
139 struct afs_thread_pool
* pool
= worker
->pool
;
141 /* register worker with pool */
142 opr_mutex_enter(&pool
->lock
);
143 queue_Append(&pool
->thread_list
, worker
);
145 opr_mutex_exit(&pool
->lock
);
147 /* call high-level entry point */
148 worker
->ret
= (*pool
->entry
)(pool
, worker
, pool
->work_queue
, pool
->rock
);
150 /* adjust pool live thread count */
151 opr_mutex_enter(&pool
->lock
);
152 opr_Assert(pool
->nthreads
);
153 queue_Remove(worker
);
155 if (!pool
->nthreads
) {
156 opr_cv_broadcast(&pool
->shutdown_cv
);
157 pool
->state
= AFS_TP_STATE_STOPPED
;
159 opr_mutex_exit(&pool
->lock
);
161 _afs_tp_worker_free(worker
);
167 * default high-level thread entry point.
172 _afs_tp_worker_default(struct afs_thread_pool
*pool
,
173 struct afs_thread_pool_worker
*worker
,
174 struct afs_work_queue
*queue
,
178 while (code
== 0 && afs_tp_worker_continue(worker
)) {
179 code
= afs_wq_do(queue
, NULL
/* no call rock */);
186 * start a worker thread.
188 * @param[in] pool thread pool object
189 * @param[inout] worker_out address in which to store worker thread object pointer
191 * @return operation status
193 * @retval ENOMEM out of memory
196 _afs_tp_worker_start(struct afs_thread_pool
* pool
,
197 struct afs_thread_pool_worker
** worker_out
)
200 pthread_attr_t attrs
;
201 struct afs_thread_pool_worker
* worker
;
203 ret
= _afs_tp_worker_alloc(worker_out
);
207 worker
= *worker_out
;
210 worker
->req_shutdown
= 0;
212 opr_Verify(pthread_attr_init(&attrs
) == 0);
213 opr_Verify(pthread_attr_setdetachstate(&attrs
, PTHREAD_CREATE_DETACHED
) == 0);
215 ret
= pthread_create(&worker
->tid
, &attrs
, &_afs_tp_worker_run
, worker
);
222 * create a thread pool.
224 * @param[inout] pool_out address in which to store pool object pointer.
225 * @param[in] queue work queue serviced by thread pool
227 * @return operation status
229 * @retval ENOMEM out of memory
232 afs_tp_create(struct afs_thread_pool
** pool_out
,
233 struct afs_work_queue
* queue
)
236 struct afs_thread_pool
* pool
;
238 ret
= _afs_tp_alloc(pool_out
);
244 opr_mutex_init(&pool
->lock
);
245 opr_cv_init(&pool
->shutdown_cv
);
246 queue_Init(&pool
->thread_list
);
247 pool
->work_queue
= queue
;
248 pool
->entry
= &_afs_tp_worker_default
;
251 pool
->max_threads
= 4;
252 pool
->state
= AFS_TP_STATE_INIT
;
259 * destroy a thread pool.
261 * @param[in] pool thread pool object to be destroyed
263 * @return operation status
265 * @retval AFS_TP_ERROR pool not in a quiescent state
268 afs_tp_destroy(struct afs_thread_pool
* pool
)
272 opr_mutex_enter(&pool
->lock
);
273 switch (pool
->state
) {
274 case AFS_TP_STATE_INIT
:
275 case AFS_TP_STATE_STOPPED
:
281 opr_mutex_exit(&pool
->lock
);
288 * set the number of threads to spawn.
290 * @param[in] pool thread pool object
291 * @param[in] threads number of threads to spawn
293 * @return operation status
295 * @retval AFS_TP_ERROR thread pool has already been started
298 afs_tp_set_threads(struct afs_thread_pool
*pool
,
303 opr_mutex_enter(&pool
->lock
);
304 if (pool
->state
!= AFS_TP_STATE_INIT
) {
307 pool
->max_threads
= threads
;
309 opr_mutex_exit(&pool
->lock
);
315 * set a custom thread entry point.
317 * @param[in] pool thread pool object
318 * @param[in] entry thread entry function pointer
319 * @param[in] rock opaque pointer passed to thread
321 * @return operation status
323 * @retval AFS_TP_ERROR thread pool has already been started
326 afs_tp_set_entry(struct afs_thread_pool
* pool
,
327 afs_tp_worker_func_t
* entry
,
332 opr_mutex_enter(&pool
->lock
);
333 if (pool
->state
!= AFS_TP_STATE_INIT
) {
339 opr_mutex_exit(&pool
->lock
);
345 * start a thread pool.
347 * @param[in] pool thread pool object
349 * @return operation status
351 * @retval AFS_TP_ERROR thread create failure
354 afs_tp_start(struct afs_thread_pool
* pool
)
357 struct afs_thread_pool_worker
* worker
;
360 opr_mutex_enter(&pool
->lock
);
361 if (pool
->state
!= AFS_TP_STATE_INIT
) {
365 pool
->state
= AFS_TP_STATE_STARTING
;
366 opr_mutex_exit(&pool
->lock
);
368 for (i
= 0; i
< pool
->max_threads
; i
++) {
369 code
= _afs_tp_worker_start(pool
, &worker
);
375 opr_mutex_enter(&pool
->lock
);
376 pool
->state
= AFS_TP_STATE_RUNNING
;
378 opr_mutex_exit(&pool
->lock
);
384 * shut down all threads in pool.
386 * @param[in] pool thread pool object
387 * @param[in] block wait for all threads to terminate, if asserted
389 * @return operation status
393 afs_tp_shutdown(struct afs_thread_pool
* pool
,
397 struct afs_thread_pool_worker
* worker
, *nn
;
399 opr_mutex_enter(&pool
->lock
);
400 if (pool
->state
== AFS_TP_STATE_STOPPED
401 || pool
->state
== AFS_TP_STATE_STOPPING
) {
404 if (pool
->state
!= AFS_TP_STATE_RUNNING
) {
408 pool
->state
= AFS_TP_STATE_STOPPING
;
410 for (queue_Scan(&pool
->thread_list
, worker
, nn
, afs_thread_pool_worker
)) {
411 worker
->req_shutdown
= 1;
413 if (!pool
->nthreads
) {
414 pool
->state
= AFS_TP_STATE_STOPPED
;
416 /* need to drop lock to get a membar here */
417 opr_mutex_exit(&pool
->lock
);
419 ret
= afs_wq_shutdown(pool
->work_queue
);
424 opr_mutex_enter(&pool
->lock
);
427 while (pool
->nthreads
) {
428 opr_cv_wait(&pool
->shutdown_cv
, &pool
->lock
);
432 opr_mutex_exit(&pool
->lock
);
439 * check whether thread pool is online.
441 * @param[in] pool thread pool object
443 * @return whether pool is online
444 * @retval 1 pool is online
445 * @retval 0 pool is not online
448 afs_tp_is_online(struct afs_thread_pool
* pool
)
452 opr_mutex_enter(&pool
->lock
);
453 ret
= (pool
->state
== AFS_TP_STATE_RUNNING
);
454 opr_mutex_exit(&pool
->lock
);
460 * check whether a given worker thread can continue to run.
462 * @param[in] worker worker thread object pointer
464 * @return whether thread can continue to execute
465 * @retval 1 execution can continue
466 * @retval 0 shutdown has been requested
469 afs_tp_worker_continue(struct afs_thread_pool_worker
* worker
)
471 return !worker
->req_shutdown
;