backport to buster
[hcoop/debian/openafs.git] / src / util / thread_pool.c
1 /*
2 * Copyright 2008-2010, Sine Nomine Associates and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #include <lock.h>
17 #include <afs/afsutil.h>
18 #include <lwp.h>
19 #include <afs/afsint.h>
20
21 #define __AFS_THREAD_POOL_IMPL 1
22 #include "work_queue.h"
23 #include "thread_pool.h"
24 #include "thread_pool_impl.h"
25
26 /**
27 * public interfaces for thread_pool.
28 */
29
30 /**
31 * allocate a thread pool object.
32 *
33 * @param[inout] pool_out address in which to store pool object pointer
34 *
35 * @return operation status
36 * @retval 0 success
37 * @retval ENOMEM out of memory
38 *
39 * @internal
40 */
41 static int
42 _afs_tp_alloc(struct afs_thread_pool ** pool_out)
43 {
44 int ret = 0;
45 struct afs_thread_pool * pool;
46
47 *pool_out = pool = malloc(sizeof(*pool));
48 if (pool == NULL) {
49 ret = ENOMEM;
50 goto error;
51 }
52
53 error:
54 return ret;
55 }
56
57 /**
58 * free a thread pool object.
59 *
60 * @param[in] pool thread pool object
61 *
62 * @return operation status
63 * @retval 0 success
64 *
65 * @internal
66 */
67 static int
68 _afs_tp_free(struct afs_thread_pool * pool)
69 {
70 int ret = 0;
71
72 free(pool);
73
74 return ret;
75 }
76
77 /**
78 * allocate a thread worker object.
79 *
80 * @param[inout] worker_out address in which to store worker object pointer
81 *
82 * @return operation status
83 * @retval 0 success
84 * @retval ENOMEM out of memory
85 *
86 * @internal
87 */
88 static int
89 _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
90 {
91 int ret = 0;
92 struct afs_thread_pool_worker * worker;
93
94 *worker_out = worker = malloc(sizeof(*worker));
95 if (worker == NULL) {
96 ret = ENOMEM;
97 goto error;
98 }
99
100 queue_NodeInit(&worker->worker_list);
101
102 error:
103 return ret;
104 }
105
106 /**
107 * free a thread worker object.
108 *
109 * @param[in] worker thread worker object
110 *
111 * @return operation status
112 * @retval 0 success
113 *
114 * @internal
115 */
116 static int
117 _afs_tp_worker_free(struct afs_thread_pool_worker * worker)
118 {
119 int ret = 0;
120
121 free(worker);
122
123 return ret;
124 }
125
126 /**
127 * low-level thread entry point.
128 *
129 * @param[in] rock opaque pointer to thread worker object
130 *
131 * @return opaque return pointer from pool entry function
132 *
133 * @internal
134 */
135 static void *
136 _afs_tp_worker_run(void * rock)
137 {
138 struct afs_thread_pool_worker * worker = rock;
139 struct afs_thread_pool * pool = worker->pool;
140
141 /* register worker with pool */
142 opr_mutex_enter(&pool->lock);
143 queue_Append(&pool->thread_list, worker);
144 pool->nthreads++;
145 opr_mutex_exit(&pool->lock);
146
147 /* call high-level entry point */
148 worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
149
150 /* adjust pool live thread count */
151 opr_mutex_enter(&pool->lock);
152 opr_Assert(pool->nthreads);
153 queue_Remove(worker);
154 pool->nthreads--;
155 if (!pool->nthreads) {
156 opr_cv_broadcast(&pool->shutdown_cv);
157 pool->state = AFS_TP_STATE_STOPPED;
158 }
159 opr_mutex_exit(&pool->lock);
160
161 _afs_tp_worker_free(worker);
162
163 return NULL;
164 }
165
166 /**
167 * default high-level thread entry point.
168 *
169 * @internal
170 */
171 static void *
172 _afs_tp_worker_default(struct afs_thread_pool *pool,
173 struct afs_thread_pool_worker *worker,
174 struct afs_work_queue *queue,
175 void *rock)
176 {
177 int code = 0;
178 while (code == 0 && afs_tp_worker_continue(worker)) {
179 code = afs_wq_do(queue, NULL /* no call rock */);
180 }
181
182 return NULL;
183 }
184
185 /**
186 * start a worker thread.
187 *
188 * @param[in] pool thread pool object
189 * @param[inout] worker_out address in which to store worker thread object pointer
190 *
191 * @return operation status
192 * @retval 0 success
193 * @retval ENOMEM out of memory
194 */
195 static int
196 _afs_tp_worker_start(struct afs_thread_pool * pool,
197 struct afs_thread_pool_worker ** worker_out)
198 {
199 int ret = 0;
200 pthread_attr_t attrs;
201 struct afs_thread_pool_worker * worker;
202
203 ret = _afs_tp_worker_alloc(worker_out);
204 if (ret) {
205 goto error;
206 }
207 worker = *worker_out;
208
209 worker->pool = pool;
210 worker->req_shutdown = 0;
211
212 opr_Verify(pthread_attr_init(&attrs) == 0);
213 opr_Verify(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
214
215 ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
216
217 error:
218 return ret;
219 }
220
221 /**
222 * create a thread pool.
223 *
224 * @param[inout] pool_out address in which to store pool object pointer.
225 * @param[in] queue work queue serviced by thread pool
226 *
227 * @return operation status
228 * @retval 0 success
229 * @retval ENOMEM out of memory
230 */
231 int
232 afs_tp_create(struct afs_thread_pool ** pool_out,
233 struct afs_work_queue * queue)
234 {
235 int ret = 0;
236 struct afs_thread_pool * pool;
237
238 ret = _afs_tp_alloc(pool_out);
239 if (ret) {
240 goto error;
241 }
242 pool = *pool_out;
243
244 opr_mutex_init(&pool->lock);
245 opr_cv_init(&pool->shutdown_cv);
246 queue_Init(&pool->thread_list);
247 pool->work_queue = queue;
248 pool->entry = &_afs_tp_worker_default;
249 pool->rock = NULL;
250 pool->nthreads = 0;
251 pool->max_threads = 4;
252 pool->state = AFS_TP_STATE_INIT;
253
254 error:
255 return ret;
256 }
257
258 /**
259 * destroy a thread pool.
260 *
261 * @param[in] pool thread pool object to be destroyed
262 *
263 * @return operation status
264 * @retval 0 success
265 * @retval AFS_TP_ERROR pool not in a quiescent state
266 */
267 int
268 afs_tp_destroy(struct afs_thread_pool * pool)
269 {
270 int ret = 0;
271
272 opr_mutex_enter(&pool->lock);
273 switch (pool->state) {
274 case AFS_TP_STATE_INIT:
275 case AFS_TP_STATE_STOPPED:
276 _afs_tp_free(pool);
277 break;
278
279 default:
280 ret = AFS_TP_ERROR;
281 opr_mutex_exit(&pool->lock);
282 }
283
284 return ret;
285 }
286
287 /**
288 * set the number of threads to spawn.
289 *
290 * @param[in] pool thread pool object
291 * @param[in] threads number of threads to spawn
292 *
293 * @return operation status
294 * @retval 0 success
295 * @retval AFS_TP_ERROR thread pool has already been started
296 */
297 int
298 afs_tp_set_threads(struct afs_thread_pool *pool,
299 afs_uint32 threads)
300 {
301 int ret = 0;
302
303 opr_mutex_enter(&pool->lock);
304 if (pool->state != AFS_TP_STATE_INIT) {
305 ret = AFS_TP_ERROR;
306 } else {
307 pool->max_threads = threads;
308 }
309 opr_mutex_exit(&pool->lock);
310
311 return ret;
312 }
313
314 /**
315 * set a custom thread entry point.
316 *
317 * @param[in] pool thread pool object
318 * @param[in] entry thread entry function pointer
319 * @param[in] rock opaque pointer passed to thread
320 *
321 * @return operation status
322 * @retval 0 success
323 * @retval AFS_TP_ERROR thread pool has already been started
324 */
325 int
326 afs_tp_set_entry(struct afs_thread_pool * pool,
327 afs_tp_worker_func_t * entry,
328 void * rock)
329 {
330 int ret = 0;
331
332 opr_mutex_enter(&pool->lock);
333 if (pool->state != AFS_TP_STATE_INIT) {
334 ret = AFS_TP_ERROR;
335 } else {
336 pool->entry = entry;
337 pool->rock = rock;
338 }
339 opr_mutex_exit(&pool->lock);
340
341 return ret;
342 }
343
344 /**
345 * start a thread pool.
346 *
347 * @param[in] pool thread pool object
348 *
349 * @return operation status
350 * @retval 0 success
351 * @retval AFS_TP_ERROR thread create failure
352 */
353 int
354 afs_tp_start(struct afs_thread_pool * pool)
355 {
356 int code, ret = 0;
357 struct afs_thread_pool_worker * worker;
358 afs_uint32 i;
359
360 opr_mutex_enter(&pool->lock);
361 if (pool->state != AFS_TP_STATE_INIT) {
362 ret = AFS_TP_ERROR;
363 goto done_sync;
364 }
365 pool->state = AFS_TP_STATE_STARTING;
366 opr_mutex_exit(&pool->lock);
367
368 for (i = 0; i < pool->max_threads; i++) {
369 code = _afs_tp_worker_start(pool, &worker);
370 if (code) {
371 ret = code;
372 }
373 }
374
375 opr_mutex_enter(&pool->lock);
376 pool->state = AFS_TP_STATE_RUNNING;
377 done_sync:
378 opr_mutex_exit(&pool->lock);
379
380 return ret;
381 }
382
383 /**
384 * shut down all threads in pool.
385 *
386 * @param[in] pool thread pool object
387 * @param[in] block wait for all threads to terminate, if asserted
388 *
389 * @return operation status
390 * @retval 0 success
391 */
392 int
393 afs_tp_shutdown(struct afs_thread_pool * pool,
394 int block)
395 {
396 int ret = 0;
397 struct afs_thread_pool_worker * worker, *nn;
398
399 opr_mutex_enter(&pool->lock);
400 if (pool->state == AFS_TP_STATE_STOPPED
401 || pool->state == AFS_TP_STATE_STOPPING) {
402 goto done_stopped;
403 }
404 if (pool->state != AFS_TP_STATE_RUNNING) {
405 ret = AFS_TP_ERROR;
406 goto done_sync;
407 }
408 pool->state = AFS_TP_STATE_STOPPING;
409
410 for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
411 worker->req_shutdown = 1;
412 }
413 if (!pool->nthreads) {
414 pool->state = AFS_TP_STATE_STOPPED;
415 }
416 /* need to drop lock to get a membar here */
417 opr_mutex_exit(&pool->lock);
418
419 ret = afs_wq_shutdown(pool->work_queue);
420 if (ret) {
421 goto error;
422 }
423
424 opr_mutex_enter(&pool->lock);
425 done_stopped:
426 if (block) {
427 while (pool->nthreads) {
428 opr_cv_wait(&pool->shutdown_cv, &pool->lock);
429 }
430 }
431 done_sync:
432 opr_mutex_exit(&pool->lock);
433
434 error:
435 return ret;
436 }
437
438 /**
439 * check whether thread pool is online.
440 *
441 * @param[in] pool thread pool object
442 *
443 * @return whether pool is online
444 * @retval 1 pool is online
445 * @retval 0 pool is not online
446 */
447 int
448 afs_tp_is_online(struct afs_thread_pool * pool)
449 {
450 int ret;
451
452 opr_mutex_enter(&pool->lock);
453 ret = (pool->state == AFS_TP_STATE_RUNNING);
454 opr_mutex_exit(&pool->lock);
455
456 return ret;
457 }
458
459 /**
460 * check whether a given worker thread can continue to run.
461 *
462 * @param[in] worker worker thread object pointer
463 *
464 * @return whether thread can continue to execute
465 * @retval 1 execution can continue
466 * @retval 0 shutdown has been requested
467 */
468 int
469 afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
470 {
471 return !worker->req_shutdown;
472 }