Commit | Line | Data |
---|---|---|
805e021f CE |
1 | /* |
2 | * Copyright 2008-2010, Sine Nomine Associates and others. | |
3 | * All Rights Reserved. | |
4 | * | |
5 | * This software has been released under the terms of the IBM Public | |
6 | * License. For details, see the LICENSE file in the top-level source | |
7 | * directory or online at http://www.openafs.org/dl/license10.html | |
8 | */ | |
9 | ||
10 | #include <afsconfig.h> | |
11 | #include <afs/param.h> | |
12 | ||
13 | #include <roken.h> | |
14 | #include <afs/opr.h> | |
15 | ||
16 | #include <lock.h> | |
17 | #include <afs/afsutil.h> | |
18 | #include <lwp.h> | |
19 | #include <afs/afsint.h> | |
20 | ||
21 | #define __AFS_THREAD_POOL_IMPL 1 | |
22 | #include "work_queue.h" | |
23 | #include "thread_pool.h" | |
24 | #include "thread_pool_impl.h" | |
25 | ||
26 | /** | |
27 | * public interfaces for thread_pool. | |
28 | */ | |
29 | ||
30 | /** | |
31 | * allocate a thread pool object. | |
32 | * | |
33 | * @param[inout] pool_out address in which to store pool object pointer | |
34 | * | |
35 | * @return operation status | |
36 | * @retval 0 success | |
37 | * @retval ENOMEM out of memory | |
38 | * | |
39 | * @internal | |
40 | */ | |
41 | static int | |
42 | _afs_tp_alloc(struct afs_thread_pool ** pool_out) | |
43 | { | |
44 | int ret = 0; | |
45 | struct afs_thread_pool * pool; | |
46 | ||
47 | *pool_out = pool = malloc(sizeof(*pool)); | |
48 | if (pool == NULL) { | |
49 | ret = ENOMEM; | |
50 | goto error; | |
51 | } | |
52 | ||
53 | error: | |
54 | return ret; | |
55 | } | |
56 | ||
57 | /** | |
58 | * free a thread pool object. | |
59 | * | |
60 | * @param[in] pool thread pool object | |
61 | * | |
62 | * @return operation status | |
63 | * @retval 0 success | |
64 | * | |
65 | * @internal | |
66 | */ | |
67 | static int | |
68 | _afs_tp_free(struct afs_thread_pool * pool) | |
69 | { | |
70 | int ret = 0; | |
71 | ||
72 | free(pool); | |
73 | ||
74 | return ret; | |
75 | } | |
76 | ||
77 | /** | |
78 | * allocate a thread worker object. | |
79 | * | |
80 | * @param[inout] worker_out address in which to store worker object pointer | |
81 | * | |
82 | * @return operation status | |
83 | * @retval 0 success | |
84 | * @retval ENOMEM out of memory | |
85 | * | |
86 | * @internal | |
87 | */ | |
88 | static int | |
89 | _afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out) | |
90 | { | |
91 | int ret = 0; | |
92 | struct afs_thread_pool_worker * worker; | |
93 | ||
94 | *worker_out = worker = malloc(sizeof(*worker)); | |
95 | if (worker == NULL) { | |
96 | ret = ENOMEM; | |
97 | goto error; | |
98 | } | |
99 | ||
100 | queue_NodeInit(&worker->worker_list); | |
101 | ||
102 | error: | |
103 | return ret; | |
104 | } | |
105 | ||
106 | /** | |
107 | * free a thread worker object. | |
108 | * | |
109 | * @param[in] worker thread worker object | |
110 | * | |
111 | * @return operation status | |
112 | * @retval 0 success | |
113 | * | |
114 | * @internal | |
115 | */ | |
116 | static int | |
117 | _afs_tp_worker_free(struct afs_thread_pool_worker * worker) | |
118 | { | |
119 | int ret = 0; | |
120 | ||
121 | free(worker); | |
122 | ||
123 | return ret; | |
124 | } | |
125 | ||
126 | /** | |
127 | * low-level thread entry point. | |
128 | * | |
129 | * @param[in] rock opaque pointer to thread worker object | |
130 | * | |
131 | * @return opaque return pointer from pool entry function | |
132 | * | |
133 | * @internal | |
134 | */ | |
135 | static void * | |
136 | _afs_tp_worker_run(void * rock) | |
137 | { | |
138 | struct afs_thread_pool_worker * worker = rock; | |
139 | struct afs_thread_pool * pool = worker->pool; | |
140 | ||
141 | /* register worker with pool */ | |
142 | opr_mutex_enter(&pool->lock); | |
143 | queue_Append(&pool->thread_list, worker); | |
144 | pool->nthreads++; | |
145 | opr_mutex_exit(&pool->lock); | |
146 | ||
147 | /* call high-level entry point */ | |
148 | worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock); | |
149 | ||
150 | /* adjust pool live thread count */ | |
151 | opr_mutex_enter(&pool->lock); | |
152 | opr_Assert(pool->nthreads); | |
153 | queue_Remove(worker); | |
154 | pool->nthreads--; | |
155 | if (!pool->nthreads) { | |
156 | opr_cv_broadcast(&pool->shutdown_cv); | |
157 | pool->state = AFS_TP_STATE_STOPPED; | |
158 | } | |
159 | opr_mutex_exit(&pool->lock); | |
160 | ||
161 | _afs_tp_worker_free(worker); | |
162 | ||
163 | return NULL; | |
164 | } | |
165 | ||
166 | /** | |
167 | * default high-level thread entry point. | |
168 | * | |
169 | * @internal | |
170 | */ | |
171 | static void * | |
172 | _afs_tp_worker_default(struct afs_thread_pool *pool, | |
173 | struct afs_thread_pool_worker *worker, | |
174 | struct afs_work_queue *queue, | |
175 | void *rock) | |
176 | { | |
177 | int code = 0; | |
178 | while (code == 0 && afs_tp_worker_continue(worker)) { | |
179 | code = afs_wq_do(queue, NULL /* no call rock */); | |
180 | } | |
181 | ||
182 | return NULL; | |
183 | } | |
184 | ||
185 | /** | |
186 | * start a worker thread. | |
187 | * | |
188 | * @param[in] pool thread pool object | |
189 | * @param[inout] worker_out address in which to store worker thread object pointer | |
190 | * | |
191 | * @return operation status | |
192 | * @retval 0 success | |
193 | * @retval ENOMEM out of memory | |
194 | */ | |
195 | static int | |
196 | _afs_tp_worker_start(struct afs_thread_pool * pool, | |
197 | struct afs_thread_pool_worker ** worker_out) | |
198 | { | |
199 | int ret = 0; | |
200 | pthread_attr_t attrs; | |
201 | struct afs_thread_pool_worker * worker; | |
202 | ||
203 | ret = _afs_tp_worker_alloc(worker_out); | |
204 | if (ret) { | |
205 | goto error; | |
206 | } | |
207 | worker = *worker_out; | |
208 | ||
209 | worker->pool = pool; | |
210 | worker->req_shutdown = 0; | |
211 | ||
212 | opr_Verify(pthread_attr_init(&attrs) == 0); | |
213 | opr_Verify(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0); | |
214 | ||
215 | ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker); | |
216 | ||
217 | error: | |
218 | return ret; | |
219 | } | |
220 | ||
221 | /** | |
222 | * create a thread pool. | |
223 | * | |
224 | * @param[inout] pool_out address in which to store pool object pointer. | |
225 | * @param[in] queue work queue serviced by thread pool | |
226 | * | |
227 | * @return operation status | |
228 | * @retval 0 success | |
229 | * @retval ENOMEM out of memory | |
230 | */ | |
231 | int | |
232 | afs_tp_create(struct afs_thread_pool ** pool_out, | |
233 | struct afs_work_queue * queue) | |
234 | { | |
235 | int ret = 0; | |
236 | struct afs_thread_pool * pool; | |
237 | ||
238 | ret = _afs_tp_alloc(pool_out); | |
239 | if (ret) { | |
240 | goto error; | |
241 | } | |
242 | pool = *pool_out; | |
243 | ||
244 | opr_mutex_init(&pool->lock); | |
245 | opr_cv_init(&pool->shutdown_cv); | |
246 | queue_Init(&pool->thread_list); | |
247 | pool->work_queue = queue; | |
248 | pool->entry = &_afs_tp_worker_default; | |
249 | pool->rock = NULL; | |
250 | pool->nthreads = 0; | |
251 | pool->max_threads = 4; | |
252 | pool->state = AFS_TP_STATE_INIT; | |
253 | ||
254 | error: | |
255 | return ret; | |
256 | } | |
257 | ||
258 | /** | |
259 | * destroy a thread pool. | |
260 | * | |
261 | * @param[in] pool thread pool object to be destroyed | |
262 | * | |
263 | * @return operation status | |
264 | * @retval 0 success | |
265 | * @retval AFS_TP_ERROR pool not in a quiescent state | |
266 | */ | |
267 | int | |
268 | afs_tp_destroy(struct afs_thread_pool * pool) | |
269 | { | |
270 | int ret = 0; | |
271 | ||
272 | opr_mutex_enter(&pool->lock); | |
273 | switch (pool->state) { | |
274 | case AFS_TP_STATE_INIT: | |
275 | case AFS_TP_STATE_STOPPED: | |
276 | _afs_tp_free(pool); | |
277 | break; | |
278 | ||
279 | default: | |
280 | ret = AFS_TP_ERROR; | |
281 | opr_mutex_exit(&pool->lock); | |
282 | } | |
283 | ||
284 | return ret; | |
285 | } | |
286 | ||
287 | /** | |
288 | * set the number of threads to spawn. | |
289 | * | |
290 | * @param[in] pool thread pool object | |
291 | * @param[in] threads number of threads to spawn | |
292 | * | |
293 | * @return operation status | |
294 | * @retval 0 success | |
295 | * @retval AFS_TP_ERROR thread pool has already been started | |
296 | */ | |
297 | int | |
298 | afs_tp_set_threads(struct afs_thread_pool *pool, | |
299 | afs_uint32 threads) | |
300 | { | |
301 | int ret = 0; | |
302 | ||
303 | opr_mutex_enter(&pool->lock); | |
304 | if (pool->state != AFS_TP_STATE_INIT) { | |
305 | ret = AFS_TP_ERROR; | |
306 | } else { | |
307 | pool->max_threads = threads; | |
308 | } | |
309 | opr_mutex_exit(&pool->lock); | |
310 | ||
311 | return ret; | |
312 | } | |
313 | ||
314 | /** | |
315 | * set a custom thread entry point. | |
316 | * | |
317 | * @param[in] pool thread pool object | |
318 | * @param[in] entry thread entry function pointer | |
319 | * @param[in] rock opaque pointer passed to thread | |
320 | * | |
321 | * @return operation status | |
322 | * @retval 0 success | |
323 | * @retval AFS_TP_ERROR thread pool has already been started | |
324 | */ | |
325 | int | |
326 | afs_tp_set_entry(struct afs_thread_pool * pool, | |
327 | afs_tp_worker_func_t * entry, | |
328 | void * rock) | |
329 | { | |
330 | int ret = 0; | |
331 | ||
332 | opr_mutex_enter(&pool->lock); | |
333 | if (pool->state != AFS_TP_STATE_INIT) { | |
334 | ret = AFS_TP_ERROR; | |
335 | } else { | |
336 | pool->entry = entry; | |
337 | pool->rock = rock; | |
338 | } | |
339 | opr_mutex_exit(&pool->lock); | |
340 | ||
341 | return ret; | |
342 | } | |
343 | ||
344 | /** | |
345 | * start a thread pool. | |
346 | * | |
347 | * @param[in] pool thread pool object | |
348 | * | |
349 | * @return operation status | |
350 | * @retval 0 success | |
351 | * @retval AFS_TP_ERROR thread create failure | |
352 | */ | |
353 | int | |
354 | afs_tp_start(struct afs_thread_pool * pool) | |
355 | { | |
356 | int code, ret = 0; | |
357 | struct afs_thread_pool_worker * worker; | |
358 | afs_uint32 i; | |
359 | ||
360 | opr_mutex_enter(&pool->lock); | |
361 | if (pool->state != AFS_TP_STATE_INIT) { | |
362 | ret = AFS_TP_ERROR; | |
363 | goto done_sync; | |
364 | } | |
365 | pool->state = AFS_TP_STATE_STARTING; | |
366 | opr_mutex_exit(&pool->lock); | |
367 | ||
368 | for (i = 0; i < pool->max_threads; i++) { | |
369 | code = _afs_tp_worker_start(pool, &worker); | |
370 | if (code) { | |
371 | ret = code; | |
372 | } | |
373 | } | |
374 | ||
375 | opr_mutex_enter(&pool->lock); | |
376 | pool->state = AFS_TP_STATE_RUNNING; | |
377 | done_sync: | |
378 | opr_mutex_exit(&pool->lock); | |
379 | ||
380 | return ret; | |
381 | } | |
382 | ||
383 | /** | |
384 | * shut down all threads in pool. | |
385 | * | |
386 | * @param[in] pool thread pool object | |
387 | * @param[in] block wait for all threads to terminate, if asserted | |
388 | * | |
389 | * @return operation status | |
390 | * @retval 0 success | |
391 | */ | |
392 | int | |
393 | afs_tp_shutdown(struct afs_thread_pool * pool, | |
394 | int block) | |
395 | { | |
396 | int ret = 0; | |
397 | struct afs_thread_pool_worker * worker, *nn; | |
398 | ||
399 | opr_mutex_enter(&pool->lock); | |
400 | if (pool->state == AFS_TP_STATE_STOPPED | |
401 | || pool->state == AFS_TP_STATE_STOPPING) { | |
402 | goto done_stopped; | |
403 | } | |
404 | if (pool->state != AFS_TP_STATE_RUNNING) { | |
405 | ret = AFS_TP_ERROR; | |
406 | goto done_sync; | |
407 | } | |
408 | pool->state = AFS_TP_STATE_STOPPING; | |
409 | ||
410 | for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) { | |
411 | worker->req_shutdown = 1; | |
412 | } | |
413 | if (!pool->nthreads) { | |
414 | pool->state = AFS_TP_STATE_STOPPED; | |
415 | } | |
416 | /* need to drop lock to get a membar here */ | |
417 | opr_mutex_exit(&pool->lock); | |
418 | ||
419 | ret = afs_wq_shutdown(pool->work_queue); | |
420 | if (ret) { | |
421 | goto error; | |
422 | } | |
423 | ||
424 | opr_mutex_enter(&pool->lock); | |
425 | done_stopped: | |
426 | if (block) { | |
427 | while (pool->nthreads) { | |
428 | opr_cv_wait(&pool->shutdown_cv, &pool->lock); | |
429 | } | |
430 | } | |
431 | done_sync: | |
432 | opr_mutex_exit(&pool->lock); | |
433 | ||
434 | error: | |
435 | return ret; | |
436 | } | |
437 | ||
438 | /** | |
439 | * check whether thread pool is online. | |
440 | * | |
441 | * @param[in] pool thread pool object | |
442 | * | |
443 | * @return whether pool is online | |
444 | * @retval 1 pool is online | |
445 | * @retval 0 pool is not online | |
446 | */ | |
447 | int | |
448 | afs_tp_is_online(struct afs_thread_pool * pool) | |
449 | { | |
450 | int ret; | |
451 | ||
452 | opr_mutex_enter(&pool->lock); | |
453 | ret = (pool->state == AFS_TP_STATE_RUNNING); | |
454 | opr_mutex_exit(&pool->lock); | |
455 | ||
456 | return ret; | |
457 | } | |
458 | ||
459 | /** | |
460 | * check whether a given worker thread can continue to run. | |
461 | * | |
462 | * @param[in] worker worker thread object pointer | |
463 | * | |
464 | * @return whether thread can continue to execute | |
465 | * @retval 1 execution can continue | |
466 | * @retval 0 shutdown has been requested | |
467 | */ | |
468 | int | |
469 | afs_tp_worker_continue(struct afs_thread_pool_worker * worker) | |
470 | { | |
471 | return !worker->req_shutdown; | |
472 | } |