backport to buster
[hcoop/debian/openafs.git] / src / util / work_queue.c
CommitLineData
805e021f
CE
1/*
2 * Copyright 2008-2010, Sine Nomine Associates and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10#include <afsconfig.h>
11#include <afs/param.h>
12
13#include <roken.h>
14#include <afs/opr.h>
15#include <opr/lock.h>
16
17#include <sys/file.h>
18
19#define __AFS_WORK_QUEUE_IMPL 1
20#include "work_queue.h"
21#include "work_queue_impl.h"
22
23/**
24 * public interfaces for work_queue.
25 */
26
27static int
28_afs_wq_node_put_r(struct afs_work_queue_node * node,
29 int drop);
30
31/**
32 * allocate a work queue object.
33 *
34 * @param[out] queue_out address in which to store queue pointer
35 *
36 * @return operation status
37 * @retval 0 success
38 * @retval ENOMEM out of memory
39 *
40 * @internal
41 */
42static int
43_afs_wq_alloc(struct afs_work_queue ** queue_out)
44{
45 int ret = 0;
46 struct afs_work_queue * queue;
47
48 *queue_out = queue = malloc(sizeof(*queue));
49 if (queue == NULL) {
50 ret = ENOMEM;
51 goto error;
52 }
53
54 error:
55 return ret;
56}
57
58/**
59 * free a work queue object.
60 *
61 * @param[in] queue work queue object
62 *
63 * @return operation status
64 * @retval 0 success
65 *
66 * @internal
67 */
68static int
69_afs_wq_free(struct afs_work_queue * queue)
70{
71 int ret = 0;
72
73 free(queue);
74
75 return ret;
76}
77
78/**
79 * change a node's state.
80 *
81 * @param[in] node node object
82 * @param[in] new_state new object state
83 *
84 * @return old state
85 *
86 * @pre node->lock held
87 *
88 * @internal
89 */
90static afs_wq_work_state_t
91_afs_wq_node_state_change(struct afs_work_queue_node * node,
92 afs_wq_work_state_t new_state)
93{
94 afs_wq_work_state_t old_state;
95
96 old_state = node->state;
97 node->state = new_state;
98
99 opr_cv_broadcast(&node->state_cv);
100
101 return old_state;
102}
103
104/**
105 * wait for a node's state to change from busy to something else.
106 *
107 * @param[in] node node object
108 *
109 * @return operation status
110 * @retval 0 success
111 *
112 * @pre node->lock held
113 *
114 * @internal
115 */
116static int
117_afs_wq_node_state_wait_busy(struct afs_work_queue_node * node)
118{
119 while (node->state == AFS_WQ_NODE_STATE_BUSY) {
120 opr_cv_wait(&node->state_cv, &node->lock);
121 }
122
123 return 0;
124}
125
126/**
127 * check whether a work queue node is busy.
128 *
129 * @param[in] node node object pointer
130 *
131 * @return whether node is busy
132 * @retval 1 node is busy
133 * @retval 0 node is not busy
134 *
135 * @pre node->lock held
136 *
137 * @internal
138 */
139static int
140_afs_wq_node_state_is_busy(struct afs_work_queue_node * node)
141{
142 return (node->state == AFS_WQ_NODE_STATE_BUSY);
143}
144
145/**
146 * attempt to simultaneously lock two work queue nodes.
147 *
148 * this is a somewhat tricky algorithm because there is no
149 * defined hierarchy within the work queue node population.
150 *
151 * @param[in] ml multilock control structure
152 *
153 * @return operation status
154 * @retval 0
155 *
156 * @note in theory, we could easily extend this to
157 * lock more than two nodes
158 *
159 * @pre
160 * - caller MUST NOT have set busy state on either node
161 *
162 * @post
163 * - locks held on both nodes
164 * - both nodes in quiescent states
165 *
166 * @note node with non-zero lock_held or busy_held fields
167 * MUST go in array index 0
168 *
169 * @internal
170 */
171static int
172_afs_wq_node_multilock(struct afs_work_queue_node_multilock * ml)
173{
174 int code, ret = 0;
175 struct timespec delay;
176 int first = 1, second = 0, tmp;
177
178 /* first pass processing */
179 if (ml->nodes[0].lock_held) {
180 if (!ml->nodes[0].busy_held) {
181 ret = _afs_wq_node_state_wait_busy(ml->nodes[0].node);
182 if (ret) {
183 goto error;
184 }
185 }
186
187 code = opr_mutex_tryenter(&ml->nodes[1].node->lock);
188 if (code) {
189 /* success */
190 goto done;
191 }
192
193 /* setup for main loop */
194 opr_mutex_exit(&ml->nodes[0].node->lock);
195 }
196
197 /*
198 * setup random exponential backoff
199 *
200 * set initial delay to random value in the range [500,1000) ns
201 */
202 delay.tv_sec = 0;
203 delay.tv_nsec = 500 + rand() % 500;
204
205 while (1) {
206 opr_mutex_enter(&ml->nodes[first].node->lock);
207 if ((first != 0) || !ml->nodes[0].busy_held) {
208 ret = _afs_wq_node_state_wait_busy(ml->nodes[first].node);
209 if (ret) {
210 /* cleanup */
211 if (!ml->nodes[0].lock_held || first) {
212 opr_mutex_exit(&ml->nodes[first].node->lock);
213 if (ml->nodes[0].lock_held) {
214 /* on error, return with locks in same state as before call */
215 opr_mutex_enter(&ml->nodes[0].node->lock);
216 }
217 }
218 goto error;
219 }
220 }
221
222 /*
223 * in order to avoid deadlock, we must use trylock and
224 * a non-blocking state check. if we meet any contention,
225 * we must drop back and start again.
226 */
227 code = opr_mutex_tryenter(&ml->nodes[second].node->lock);
228 if (code) {
229 if (((second == 0) && (ml->nodes[0].busy_held)) ||
230 !_afs_wq_node_state_is_busy(ml->nodes[second].node)) {
231 /* success */
232 break;
233 } else {
234 opr_mutex_exit(&ml->nodes[second].node->lock);
235 }
236 }
237
238 /*
239 * contended.
240 *
241 * drop locks, use exponential backoff,
242 * try acquiring in the opposite order
243 */
244 opr_mutex_exit(&ml->nodes[first].node->lock);
245 nanosleep(&delay, NULL);
246 if (delay.tv_nsec <= 65536000) { /* max backoff delay of ~131ms */
247 delay.tv_nsec <<= 1;
248 }
249 tmp = second;
250 second = first;
251 first = tmp;
252 }
253
254 done:
255 error:
256 return ret;
257}
258
259/**
260 * initialize a node list object.
261 *
262 * @param[in] list list object
263 * @param[in] id list identifier
264 *
265 * @return operation status
266 * @retval 0 success
267 *
268 * @internal
269 */
270static int
271_afs_wq_node_list_init(struct afs_work_queue_node_list * list,
272 afs_wq_node_list_id_t id)
273{
274 queue_Init(&list->list);
275 opr_mutex_init(&list->lock);
276 opr_cv_init(&list->cv);
277 list->qidx = id;
278 list->shutdown = 0;
279
280 return 0;
281}
282
283/**
284 * destroy a node list object.
285 *
286 * @param[in] list list object
287 *
288 * @return operation status
289 * @retval 0 success
290 * @retval AFS_WQ_ERROR list not empty
291 *
292 * @internal
293 */
294static int
295_afs_wq_node_list_destroy(struct afs_work_queue_node_list * list)
296{
297 int ret = 0;
298
299 if (queue_IsNotEmpty(&list->list)) {
300 ret = AFS_WQ_ERROR;
301 goto error;
302 }
303
304 opr_mutex_destroy(&list->lock);
305 opr_cv_destroy(&list->cv);
306
307 error:
308 return ret;
309}
310
311/**
312 * wakeup all threads waiting in dequeue.
313 *
314 * @param[in] list list object
315 *
316 * @return operation status
317 * @retval 0 success
318 *
319 * @internal
320 */
321static int
322_afs_wq_node_list_shutdown(struct afs_work_queue_node_list * list)
323{
324 int ret = 0;
325 struct afs_work_queue_node *node, *nnode;
326
327 opr_mutex_enter(&list->lock);
328 list->shutdown = 1;
329
330 for (queue_Scan(&list->list, node, nnode, afs_work_queue_node)) {
331 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_ERROR);
332 queue_Remove(node);
333 node->qidx = AFS_WQ_NODE_LIST_NONE;
334 node->queue = NULL;
335
336 if (node->detached) {
337 /* if we are detached, we hold the reference on the node;
338 * otherwise, it is some other caller that holds the reference.
339 * So don't put the node if we are not detached; the node will
340 * get freed when someone else calls afs_wq_node_put */
341 afs_wq_node_put(node);
342 }
343 }
344
345 opr_cv_broadcast(&list->cv);
346 opr_mutex_exit(&list->lock);
347
348 return ret;
349}
350
351/**
352 * append to a node list object.
353 *
354 * @param[in] list list object
355 * @param[in] node node object
356 * @param[in] state new node state
357 *
358 * @return operation status
359 * @retval 0 success
360 * @retval AFS_WQ_ERROR raced to enqueue node
361 *
362 * @pre
363 * - node lock held
364 * - node is not on a list
365 * - node is either not busy, or it is marked as busy by the calling thread
366 *
367 * @post
368 * - enqueued on list
369 * - node lock dropped
370 *
371 * @internal
372 */
373static int
374_afs_wq_node_list_enqueue(struct afs_work_queue_node_list * list,
375 struct afs_work_queue_node * node,
376 afs_wq_work_state_t state)
377{
378 int code, ret = 0;
379
380 if (node->qidx != AFS_WQ_NODE_LIST_NONE) {
381 /* raced */
382 ret = AFS_WQ_ERROR;
383 goto error;
384 }
385
386 /* deal with lock inversion */
387 code = opr_mutex_tryenter(&list->lock);
388 if (!code) {
389 /* contended */
390 _afs_wq_node_state_change(node, AFS_WQ_NODE_STATE_BUSY);
391 opr_mutex_exit(&node->lock);
392 opr_mutex_enter(&list->lock);
393 opr_mutex_enter(&node->lock);
394
395 /* assert state of the world (we set busy, so this should never happen) */
396 opr_Assert(queue_IsNotOnQueue(node));
397 }
398
399 if (list->shutdown) {
400 ret = AFS_WQ_ERROR;
401 goto error_unlock;
402 }
403
404 opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
405 if (queue_IsEmpty(&list->list)) {
406 /* wakeup a dequeue thread */
407 opr_cv_signal(&list->cv);
408 }
409 queue_Append(&list->list, node);
410 node->qidx = list->qidx;
411 _afs_wq_node_state_change(node, state);
412
413 error_unlock:
414 opr_mutex_exit(&node->lock);
415 opr_mutex_exit(&list->lock);
416
417 error:
418 return ret;
419}
420
421/**
422 * dequeue a node from a list object.
423 *
424 * @param[in] list list object
425 * @param[out] node_out address in which to store node object pointer
426 * @param[in] state new node state
427 * @param[in] block permit blocking on cv if asserted
428 *
429 * @return operation status
430 * @retval 0 success
431 * @retval EWOULDBLOCK block not asserted and nothing to dequeue
432 * @retval EINTR blocking wait interrupted by list shutdown
433 *
434 * @post node object returned with node lock held and new state set
435 *
436 * @internal
437 */
438static int
439_afs_wq_node_list_dequeue(struct afs_work_queue_node_list * list,
440 struct afs_work_queue_node ** node_out,
441 afs_wq_work_state_t state,
442 int block)
443{
444 int ret = 0;
445 struct afs_work_queue_node * node;
446
447 opr_mutex_enter(&list->lock);
448
449 if (list->shutdown) {
450 *node_out = NULL;
451 ret = EINTR;
452 goto done_sync;
453 }
454
455 if (!block && queue_IsEmpty(&list->list)) {
456 *node_out = NULL;
457 ret = EWOULDBLOCK;
458 goto done_sync;
459 }
460
461 while (queue_IsEmpty(&list->list)) {
462 if (list->shutdown) {
463 *node_out = NULL;
464 ret = EINTR;
465 goto done_sync;
466 }
467 opr_cv_wait(&list->cv, &list->lock);
468 }
469
470 *node_out = node = queue_First(&list->list, afs_work_queue_node);
471
472 opr_mutex_enter(&node->lock);
473 queue_Remove(node);
474 node->qidx = AFS_WQ_NODE_LIST_NONE;
475 _afs_wq_node_state_change(node, state);
476
477 done_sync:
478 opr_mutex_exit(&list->lock);
479
480 return ret;
481}
482
483/**
484 * remove a node from a list.
485 *
486 * @param[in] node node object
487 * @param[in] next_state node state following successful dequeue
488 *
489 * @return operation status
490 * @retval 0 success
491 * @retval AFS_WQ_ERROR in any of the following conditions:
492 * - node not associated with a work queue
493 * - node was not on a linked list (e.g. RUNNING state)
494 * - we raced another thread
495 *
496 * @pre node->lock held
497 *
498 * @post node removed from node list
499 *
500 * @note node->lock may be dropped internally
501 *
502 * @internal
503 */
504static int
505_afs_wq_node_list_remove(struct afs_work_queue_node * node,
506 afs_wq_work_state_t next_state)
507{
508 int code, ret = 0;
509 struct afs_work_queue_node_list * list = NULL;
510
511 _afs_wq_node_state_wait_busy(node);
512
513 if (!node->queue) {
514 ret = AFS_WQ_ERROR;
515 goto error;
516 }
517 switch (node->qidx) {
518 case AFS_WQ_NODE_LIST_READY:
519 list = &node->queue->ready_list;
520 break;
521
522 case AFS_WQ_NODE_LIST_BLOCKED:
523 list = &node->queue->blocked_list;
524 break;
525
526 case AFS_WQ_NODE_LIST_DONE:
527 list = &node->queue->done_list;
528 break;
529
530 default:
531 ret = AFS_WQ_ERROR;
532 }
533
534 if (list) {
535 code = opr_mutex_tryenter(&list->lock);
536 if (!code) {
537 /* contended */
538 _afs_wq_node_state_change(node,
539 AFS_WQ_NODE_STATE_BUSY);
540 opr_mutex_exit(&node->lock);
541 opr_mutex_enter(&list->lock);
542 opr_mutex_enter(&node->lock);
543
544 if (node->qidx == AFS_WQ_NODE_LIST_NONE) {
545 /* raced */
546 ret= AFS_WQ_ERROR;
547 goto done_sync;
548 }
549 }
550
551 queue_Remove(node);
552 node->qidx = AFS_WQ_NODE_LIST_NONE;
553 _afs_wq_node_state_change(node, next_state);
554
555 done_sync:
556 opr_mutex_exit(&list->lock);
557 }
558
559 error:
560 return ret;
561}
562
563/**
564 * allocate a dependency node.
565 *
566 * @param[out] node_out address in which to store dep node pointer
567 *
568 * @return operation status
569 * @retval 0 success
570 * @retval ENOMEM out of memory
571 *
572 * @internal
573 */
574static int
575_afs_wq_dep_alloc(struct afs_work_queue_dep_node ** node_out)
576{
577 int ret = 0;
578 struct afs_work_queue_dep_node * node;
579
580 node = malloc(sizeof(*node));
581 if (node == NULL) {
582 ret = ENOMEM;
583 goto error;
584 }
585
586 queue_NodeInit(&node->parent_list);
587 node->parent = node->child = NULL;
588
589 *node_out = node;
590
591 error:
592 return ret;
593}
594
595/**
596 * free a dependency node.
597 *
598 * @param[in] node dep node pointer
599 *
600 * @return operation status
601 * @retval 0 success
602 * @retval AFS_WQ_ERROR still attached to a work node
603 *
604 * @internal
605 */
606static int
607_afs_wq_dep_free(struct afs_work_queue_dep_node * node)
608{
609 int ret = 0;
610
611 if (queue_IsOnQueue(&node->parent_list) ||
612 node->parent ||
613 node->child) {
614 ret = AFS_WQ_ERROR;
615 goto error;
616 }
617
618 free(node);
619
620 error:
621 return ret;
622}
623
624/**
625 * unlink work nodes from a dependency node.
626 *
627 * @param[in] dep dependency node
628 *
629 * @return operation status
630 * @retval 0 success
631 *
632 * @pre
633 * - dep->parent and dep->child are either locked, or are not referenced
634 * by anything else
635 * - caller holds ref on dep->child
636 * - dep->child and dep->parent in quiescent state
637 *
638 * @internal
639 */
640static int
641_afs_wq_dep_unlink_r(struct afs_work_queue_dep_node *dep)
642{
643 struct afs_work_queue_node *child = dep->child;
644 queue_Remove(&dep->parent_list);
645 dep->child = NULL;
646 dep->parent = NULL;
647
648 return _afs_wq_node_put_r(child, 0);
649}
650
651/**
652 * get a reference to a work node.
653 *
654 * @param[in] node work queue node
655 *
656 * @return operation status
657 * @retval 0 success
658 *
659 * @pre node->lock held
660 *
661 * @internal
662 */
663static int
664_afs_wq_node_get_r(struct afs_work_queue_node * node)
665{
666 node->refcount++;
667
668 return 0;
669}
670
671/**
672 * unlink and free all of the dependency nodes from a node.
673 *
674 * @param[in] parent work node that is the parent node of all deps to be freed
675 *
676 * @return operation status
677 * @retval 0 success
678 *
679 * @pre parent->refcount == 0
680 */
681static int
682_afs_wq_node_free_deps(struct afs_work_queue_node *parent)
683{
684 int ret = 0, code;
685 struct afs_work_queue_node *node_unlock = NULL, *node_put = NULL;
686 struct afs_work_queue_dep_node * dep, * nd;
687
688 /* unlink and free all of the dep structs attached to 'parent' */
689 for (queue_Scan(&parent->dep_children,
690 dep,
691 nd,
692 afs_work_queue_dep_node)) {
693
694 opr_mutex_enter(&dep->child->lock);
695 node_unlock = dep->child;
696
697 /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
698 * put the last ref on the child, and we need the child to still exist
699 * so we can unlock it */
700 code = _afs_wq_node_get_r(dep->child);
701 if (code) {
702 goto loop_error;
703 }
704 node_put = dep->child;
705
706 /* remember, no need to lock dep->parent, since its refcount is 0 */
707 code = _afs_wq_dep_unlink_r(dep);
708
709 loop_error:
710 if (node_put) {
711 _afs_wq_node_put_r(node_put, 1);
712 } else if (node_unlock) {
713 opr_mutex_exit(&node_unlock->lock);
714 }
715 node_put = node_unlock = NULL;
716
717 if (code == 0) {
718 /* Only do this if everything is okay; if code is nonzero,
719 * something will still be pointing at dep, so don't free it.
720 * We will leak memory, but that's better than memory corruption;
721 * we've done all we can do to try and free the dep memory */
722 code = _afs_wq_dep_free(dep);
723 }
724
725 if (!ret) {
726 ret = code;
727 }
728 }
729 return ret;
730}
731
732/**
733 * propagate state down through dep nodes.
734 *
735 * @param[in] parent parent node object
736 * @param[in] next_state next state parent will assume
737 *
738 * @return operation status
739 * @retval 0 success
740 *
741 * @pre
742 * - parent->lock held
743 *
744 * @internal
745 */
746static int
747_afs_wq_dep_propagate(struct afs_work_queue_node * parent,
748 afs_wq_work_state_t next_state)
749{
750 int ret = 0;
751 struct afs_work_queue_dep_node * dep, * nd;
752 struct afs_work_queue_node_multilock ml;
753 afs_wq_work_state_t old_state;
754 afs_wq_node_list_id_t qidx;
755 struct afs_work_queue_node_list * ql;
756 afs_wq_work_state_t cns;
757
758 old_state = _afs_wq_node_state_change(parent,
759 AFS_WQ_NODE_STATE_BUSY);
760 ml.nodes[0].node = parent;
761 ml.nodes[0].lock_held = 1;
762 ml.nodes[0].busy_held = 1;
763
764 /* scan through our children updating scheduling state */
765 for (queue_Scan(&parent->dep_children,
766 dep,
767 nd,
768 afs_work_queue_dep_node)) {
769 /* skip half-registered nodes */
770 if (dep->child == NULL) {
771 continue;
772 }
773
774 ml.nodes[1].node = dep->child;
775 ml.nodes[1].lock_held = 0;
776 ml.nodes[1].busy_held = 0;
777 ret = _afs_wq_node_multilock(&ml);
778 if (ret) {
779 goto error;
780 }
781
782 switch (next_state) {
783 case AFS_WQ_NODE_STATE_DONE:
784 dep->child->block_count--;
785 break;
786
787 case AFS_WQ_NODE_STATE_ERROR:
788 dep->child->error_count++;
789 break;
790
791 default:
792 (void)0; /* nop */
793 }
794
795 /* skip unscheduled nodes */
796 if (dep->child->queue == NULL) {
797 opr_mutex_exit(&dep->child->lock);
798 continue;
799 }
800
801 /*
802 * when blocked dep and error'd dep counts reach zero, the
803 * node can be scheduled for execution
804 */
805 if (dep->child->error_count) {
806 ql = &dep->child->queue->done_list;
807 qidx = AFS_WQ_NODE_LIST_DONE;
808 cns = AFS_WQ_NODE_STATE_ERROR;
809 } else if (dep->child->block_count) {
810 ql = &dep->child->queue->blocked_list;
811 qidx = AFS_WQ_NODE_LIST_BLOCKED;
812 cns = AFS_WQ_NODE_STATE_BLOCKED;
813 } else {
814 ql = &dep->child->queue->ready_list;
815 qidx = AFS_WQ_NODE_LIST_READY;
816 cns = AFS_WQ_NODE_STATE_SCHEDULED;
817 }
818
819 if (qidx != dep->child->qidx) {
820 /* we're transitioning to a different queue */
821 ret = _afs_wq_node_list_remove(dep->child,
822 AFS_WQ_NODE_STATE_BUSY);
823 if (ret) {
824 opr_mutex_exit(&dep->child->lock);
825 goto error;
826 }
827
828 ret = _afs_wq_node_list_enqueue(ql,
829 dep->child,
830 cns);
831 if (ret) {
832 opr_mutex_exit(&dep->child->lock);
833 goto error;
834 }
835 }
836 opr_mutex_exit(&dep->child->lock);
837 }
838
839 error:
840 _afs_wq_node_state_change(parent,
841 old_state);
842 return ret;
843}
844
845/**
846 * decrements queue->running_count, and signals waiters if appropriate.
847 *
848 * @param[in] queue queue to dec the running count of
849 */
850static void
851_afs_wq_dec_running_count(struct afs_work_queue *queue)
852{
853 opr_mutex_enter(&queue->lock);
854 queue->running_count--;
855 if (queue->shutdown && queue->running_count == 0) {
856 /* if we've shut down, someone may be waiting for the running count
857 * to drop to 0 */
858 opr_cv_broadcast(&queue->running_cv);
859 }
860 opr_mutex_exit(&queue->lock);
861}
862
863/**
864 * execute a node on the queue.
865 *
866 * @param[in] queue work queue
867 * @param[in] rock opaque pointer (passed as third arg to callback func)
868 * @param[in] block allow blocking in dequeue
869 *
870 * @return operation status
871 * @retval 0 completed a work unit
872 *
873 * @internal
874 */
875static int
876_afs_wq_do(struct afs_work_queue * queue,
877 void * rock,
878 int block)
879{
880 int code, ret = 0;
881 struct afs_work_queue_node * node;
882 afs_wq_callback_func_t * cbf;
883 afs_wq_work_state_t next_state;
884 struct afs_work_queue_node_list * ql;
885 void * node_rock;
886 int detached = 0;
887
888 /* We can inc queue->running_count before actually pulling the node off
889 * of the ready_list, since running_count only really matters when we are
890 * shut down. If we get shut down before we pull the node off of
891 * ready_list, but after we inc'd running_count,
892 * _afs_wq_node_list_dequeue should return immediately with EINTR,
893 * in which case we'll dec running_count, so it's as if we never inc'd it
894 * in the first place. */
895 opr_mutex_enter(&queue->lock);
896 if (queue->shutdown) {
897 opr_mutex_exit(&queue->lock);
898 return EINTR;
899 }
900 queue->running_count++;
901 opr_mutex_exit(&queue->lock);
902
903 ret = _afs_wq_node_list_dequeue(&queue->ready_list,
904 &node,
905 AFS_WQ_NODE_STATE_RUNNING,
906 block);
907 if (ret) {
908 _afs_wq_dec_running_count(queue);
909 goto error;
910 }
911
912 cbf = node->cbf;
913 node_rock = node->rock;
914 detached = node->detached;
915
916 if (cbf != NULL) {
917 opr_mutex_exit(&node->lock);
918 code = (*cbf)(queue, node, queue->rock, node_rock, rock);
919 opr_mutex_enter(&node->lock);
920 if (code == 0) {
921 next_state = AFS_WQ_NODE_STATE_DONE;
922 ql = &queue->done_list;
923 } else if (code == AFS_WQ_ERROR_RESCHEDULE) {
924 if (node->error_count) {
925 next_state = AFS_WQ_NODE_STATE_ERROR;
926 ql = &queue->done_list;
927 } else if (node->block_count) {
928 next_state = AFS_WQ_NODE_STATE_BLOCKED;
929 ql = &queue->blocked_list;
930 } else {
931 next_state = AFS_WQ_NODE_STATE_SCHEDULED;
932 ql = &queue->ready_list;
933 }
934 } else {
935 next_state = AFS_WQ_NODE_STATE_ERROR;
936 ql = &queue->done_list;
937 }
938 } else {
939 next_state = AFS_WQ_NODE_STATE_DONE;
940 code = 0;
941 ql = &queue->done_list;
942 }
943
944 _afs_wq_dec_running_count(queue);
945
946 node->retcode = code;
947
948 if ((next_state == AFS_WQ_NODE_STATE_DONE) ||
949 (next_state == AFS_WQ_NODE_STATE_ERROR)) {
950
951 opr_mutex_enter(&queue->lock);
952
953 if (queue->drain && queue->pend_count == queue->opts.pend_lothresh) {
954 /* signal other threads if we're about to below the low
955 * pending-tasks threshold */
956 queue->drain = 0;
957 opr_cv_signal(&queue->pend_cv);
958 }
959
960 if (queue->pend_count == 1) {
961 /* signal other threads if we're about to become 'empty' */
962 opr_cv_broadcast(&queue->empty_cv);
963 }
964
965 queue->pend_count--;
966
967 opr_mutex_exit(&queue->lock);
968 }
969
970 ret = _afs_wq_node_state_wait_busy(node);
971 if (ret) {
972 goto error;
973 }
974
975 /* propagate scheduling changes down through dependencies */
976 ret = _afs_wq_dep_propagate(node, next_state);
977 if (ret) {
978 goto error;
979 }
980
981 ret = _afs_wq_node_state_wait_busy(node);
982 if (ret) {
983 goto error;
984 }
985
986 if (detached &&
987 ((next_state == AFS_WQ_NODE_STATE_DONE) ||
988 (next_state == AFS_WQ_NODE_STATE_ERROR))) {
989 _afs_wq_node_state_change(node, next_state);
990 _afs_wq_node_put_r(node, 1);
991 } else {
992 ret = _afs_wq_node_list_enqueue(ql,
993 node,
994 next_state);
995 }
996
997 error:
998 return ret;
999}
1000
1001/**
1002 * initialize a struct afs_work_queue_opts to the default values
1003 *
1004 * @param[out] opts opts struct to initialize
1005 */
1006void
1007afs_wq_opts_init(struct afs_work_queue_opts *opts)
1008{
1009 opts->pend_lothresh = 0;
1010 opts->pend_hithresh = 0;
1011}
1012
1013/**
1014 * set the options for a struct afs_work_queue_opts appropriate for a certain
1015 * number of threads.
1016 *
1017 * @param[out] opts opts struct in which to set the values
1018 * @param[in] threads number of threads
1019 */
1020void
1021afs_wq_opts_calc_thresh(struct afs_work_queue_opts *opts, int threads)
1022{
1023 opts->pend_lothresh = threads * 2;
1024 opts->pend_hithresh = threads * 16;
1025
1026 /* safety */
1027 if (opts->pend_lothresh < 1) {
1028 opts->pend_lothresh = 1;
1029 }
1030 if (opts->pend_hithresh < 2) {
1031 opts->pend_hithresh = 2;
1032 }
1033}
1034
1035/**
1036 * allocate and initialize a work queue object.
1037 *
1038 * @param[out] queue_out address in which to store newly allocated work queue object
1039 * @param[in] rock work queue opaque pointer (passed as first arg to all fired callbacks)
1040 * @param[in] opts options for the new created queue
1041 *
1042 * @return operation status
1043 * @retval 0 success
1044 * @retval ENOMEM out of memory
1045 */
1046int
1047afs_wq_create(struct afs_work_queue ** queue_out,
1048 void * rock,
1049 struct afs_work_queue_opts *opts)
1050{
1051 int ret = 0;
1052 struct afs_work_queue * queue;
1053
1054 ret = _afs_wq_alloc(queue_out);
1055 if (ret) {
1056 goto error;
1057 }
1058 queue = *queue_out;
1059
1060 if (opts) {
1061 memcpy(&queue->opts, opts, sizeof(queue->opts));
1062 } else {
1063 afs_wq_opts_init(&queue->opts);
1064 }
1065
1066 _afs_wq_node_list_init(&queue->ready_list,
1067 AFS_WQ_NODE_LIST_READY);
1068 _afs_wq_node_list_init(&queue->blocked_list,
1069 AFS_WQ_NODE_LIST_BLOCKED);
1070 _afs_wq_node_list_init(&queue->done_list,
1071 AFS_WQ_NODE_LIST_DONE);
1072 queue->rock = rock;
1073 queue->drain = 0;
1074 queue->shutdown = 0;
1075 queue->pend_count = 0;
1076 queue->running_count = 0;
1077
1078 opr_mutex_init(&queue->lock);
1079 opr_cv_init(&queue->pend_cv);
1080 opr_cv_init(&queue->empty_cv);
1081 opr_cv_init(&queue->running_cv);
1082
1083 error:
1084 return ret;
1085}
1086
1087/**
1088 * deallocate and free a work queue object.
1089 *
1090 * @param[in] queue work queue to be destroyed
1091 *
1092 * @return operation status
1093 * @retval 0 success
1094 * @retval AFS_WQ_ERROR unspecified error
1095 */
1096int
1097afs_wq_destroy(struct afs_work_queue * queue)
1098{
1099 int ret = 0;
1100
1101 ret = _afs_wq_node_list_destroy(&queue->ready_list);
1102 if (ret) {
1103 goto error;
1104 }
1105
1106 ret = _afs_wq_node_list_destroy(&queue->blocked_list);
1107 if (ret) {
1108 goto error;
1109 }
1110
1111 ret = _afs_wq_node_list_destroy(&queue->done_list);
1112 if (ret) {
1113 goto error;
1114 }
1115
1116 ret = _afs_wq_free(queue);
1117
1118 error:
1119 return ret;
1120}
1121
1122/**
1123 * shutdown a work queue.
1124 *
1125 * @param[in] queue work queue object pointer
1126 *
1127 * @return operation status
1128 * @retval 0 success
1129 */
1130int
1131afs_wq_shutdown(struct afs_work_queue * queue)
1132{
1133 int ret = 0;
1134
1135 opr_mutex_enter(&queue->lock);
1136 if (queue->shutdown) {
1137 /* already shutdown, do nothing */
1138 opr_mutex_exit(&queue->lock);
1139 goto error;
1140 }
1141 queue->shutdown = 1;
1142
1143 ret = _afs_wq_node_list_shutdown(&queue->ready_list);
1144 if (ret) {
1145 goto error;
1146 }
1147
1148 ret = _afs_wq_node_list_shutdown(&queue->blocked_list);
1149 if (ret) {
1150 goto error;
1151 }
1152
1153 ret = _afs_wq_node_list_shutdown(&queue->done_list);
1154 if (ret) {
1155 goto error;
1156 }
1157
1158 /* signal everyone that could be waiting, since these conditions will
1159 * generally fail to signal on their own if we're shutdown, since no
1160 * progress is being made */
1161 opr_cv_broadcast(&queue->pend_cv);
1162 opr_cv_broadcast(&queue->empty_cv);
1163 opr_mutex_exit(&queue->lock);
1164
1165 error:
1166 return ret;
1167}
1168
1169/**
1170 * allocate a work node.
1171 *
1172 * @param[out] node_out address in which to store new work node
1173 *
1174 * @return operation status
1175 * @retval 0 success
1176 * @retval ENOMEM out of memory
1177 */
1178int
1179afs_wq_node_alloc(struct afs_work_queue_node ** node_out)
1180{
1181 int ret = 0;
1182 struct afs_work_queue_node * node;
1183
1184 *node_out = node = malloc(sizeof(*node));
1185 if (node == NULL) {
1186 ret = ENOMEM;
1187 goto error;
1188 }
1189
1190 queue_NodeInit(&node->node_list);
1191 node->qidx = AFS_WQ_NODE_LIST_NONE;
1192 node->cbf = NULL;
1193 node->rock = node->queue = NULL;
1194 node->refcount = 1;
1195 node->block_count = 0;
1196 node->error_count = 0;
1197 opr_mutex_init(&node->lock);
1198 opr_cv_init(&node->state_cv);
1199 node->state = AFS_WQ_NODE_STATE_INIT;
1200 queue_Init(&node->dep_children);
1201
1202 error:
1203 return ret;
1204}
1205
1206/**
1207 * free a work node.
1208 *
1209 * @param[in] node work node object
1210 *
1211 * @return operation status
1212 * @retval 0 success
1213 *
1214 * @internal
1215 */
1216static int
1217_afs_wq_node_free(struct afs_work_queue_node * node)
1218{
1219 int ret = 0;
1220
1221 if (queue_IsOnQueue(node) ||
1222 (node->state == AFS_WQ_NODE_STATE_SCHEDULED) ||
1223 (node->state == AFS_WQ_NODE_STATE_RUNNING) ||
1224 (node->state == AFS_WQ_NODE_STATE_BLOCKED)) {
1225 ret = AFS_WQ_ERROR;
1226 goto error;
1227 }
1228
1229 ret = _afs_wq_node_free_deps(node);
1230 if (ret) {
1231 goto error;
1232 }
1233
1234 opr_mutex_destroy(&node->lock);
1235 opr_cv_destory(&node->state_cv);
1236
1237 if (node->rock_dtor) {
1238 (*node->rock_dtor) (node->rock);
1239 }
1240
1241 free(node);
1242
1243 error:
1244 return ret;
1245}
1246
1247/**
1248 * get a reference to a work node.
1249 *
1250 * @param[in] node work queue node
1251 *
1252 * @return operation status
1253 * @retval 0 success
1254 */
1255int
1256afs_wq_node_get(struct afs_work_queue_node * node)
1257{
1258 opr_mutex_enter(&node->lock);
1259 node->refcount++;
1260 opr_mutex_exit(&node->lock);
1261
1262 return 0;
1263}
1264
1265/**
1266 * put back a reference to a work node.
1267 *
1268 * @param[in] node work queue node
1269 * @param[in] drop drop node->lock
1270 *
1271 * @post if refcount reaches zero, node is deallocated.
1272 *
1273 * @return operation status
1274 * @retval 0 success
1275 *
1276 * @pre node->lock held
1277 *
1278 * @internal
1279 */
1280static int
1281_afs_wq_node_put_r(struct afs_work_queue_node * node,
1282 int drop)
1283{
1284 afs_uint32 refc;
1285
1286 opr_Assert(node->refcount > 0);
1287 refc = --node->refcount;
1288 if (drop) {
1289 opr_mutex_exit(&node->lock);
1290 }
1291 if (!refc) {
1292 opr_Assert(node->qidx == AFS_WQ_NODE_LIST_NONE);
1293 _afs_wq_node_free(node);
1294 }
1295
1296 return 0;
1297}
1298
1299/**
1300 * put back a reference to a work node.
1301 *
1302 * @param[in] node work queue node
1303 *
1304 * @post if refcount reaches zero, node is deallocated.
1305 *
1306 * @return operation status
1307 * @retval 0 success
1308 */
1309int
1310afs_wq_node_put(struct afs_work_queue_node * node)
1311{
1312 opr_mutex_enter(&node->lock);
1313 return _afs_wq_node_put_r(node, 1);
1314}
1315
1316/**
1317 * set the callback function on a work node.
1318 *
1319 * @param[in] node work queue node
1320 * @param[in] cbf callback function
1321 * @param[in] rock opaque pointer passed to callback
1322 * @param[in] rock_dtor destructor function for 'rock', or NULL
1323 *
1324 * @return operation status
1325 * @retval 0 success
1326 */
1327int
1328afs_wq_node_set_callback(struct afs_work_queue_node * node,
1329 afs_wq_callback_func_t * cbf,
1330 void * rock, afs_wq_callback_dtor_t *dtor)
1331{
1332 opr_mutex_enter(&node->lock);
1333 node->cbf = cbf;
1334 node->rock = rock;
1335 node->rock_dtor = dtor;
1336 opr_mutex_exit(&node->lock);
1337
1338 return 0;
1339}
1340
1341/**
1342 * detach work node.
1343 *
1344 * @param[in] node work queue node
1345 *
1346 * @return operation status
1347 * @retval 0 success
1348 */
1349int
1350afs_wq_node_set_detached(struct afs_work_queue_node * node)
1351{
1352 opr_mutex_enter(&node->lock);
1353 node->detached = 1;
1354 opr_mutex_exit(&node->lock);
1355
1356 return 0;
1357}
1358
1359/**
1360 * link a dependency node to a parent and child work node.
1361 *
1362 * This links a dependency node such that when the 'parent' work node is
1363 * done, the 'child' work node can proceed.
1364 *
1365 * @param[in] dep dependency node
1366 * @param[in] parent parent node in this dependency
1367 * @param[in] child child node in this dependency
1368 *
1369 * @return operation status
1370 * @retval 0 success
1371 *
1372 * @pre
1373 * - parent->lock held
1374 * - child->lock held
1375 * - parent and child in quiescent state
1376 *
1377 * @internal
1378 */
1379static int
1380_afs_wq_dep_link_r(struct afs_work_queue_dep_node *dep,
1381 struct afs_work_queue_node *parent,
1382 struct afs_work_queue_node *child)
1383{
1384 int ret = 0;
1385
1386 /* Each dep node adds a ref to the child node of that dep. We do not
1387 * do the same for the parent node, since if the only refs remaining
1388 * for a node are deps in node->dep_children, then the node should be
1389 * destroyed, and we will destroy the dep nodes when we free the
1390 * work node. */
1391 ret = _afs_wq_node_get_r(child);
1392 if (ret) {
1393 goto error;
1394 }
1395
1396 /* add this dep node to the parent node's list of deps */
1397 queue_Append(&parent->dep_children, &dep->parent_list);
1398
1399 dep->child = child;
1400 dep->parent = parent;
1401
1402 error:
1403 return ret;
1404}
1405
1406/**
1407 * add a dependency to a work node.
1408 *
1409 * @param[in] child node which will be dependent upon completion of parent
1410 * @param[in] parent node whose completion gates child's execution
1411 *
1412 * @pre
1413 * - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1414 *
1415 * @return operation status
1416 * @retval 0 success
1417 */
1418int
1419afs_wq_node_dep_add(struct afs_work_queue_node * child,
1420 struct afs_work_queue_node * parent)
1421{
1422 int ret = 0;
1423 struct afs_work_queue_dep_node * dep = NULL;
1424 struct afs_work_queue_node_multilock ml;
1425 int held = 0;
1426
1427 /* self references are bad, mmkay? */
1428 if (parent == child) {
1429 ret = AFS_WQ_ERROR;
1430 goto error;
1431 }
1432
1433 ret = _afs_wq_dep_alloc(&dep);
1434 if (ret) {
1435 goto error;
1436 }
1437
1438 memset(&ml, 0, sizeof(ml));
1439 ml.nodes[0].node = parent;
1440 ml.nodes[1].node = child;
1441 ret = _afs_wq_node_multilock(&ml);
1442 if (ret) {
1443 goto error;
1444 }
1445 held = 1;
1446
1447 /* only allow dep modification while in initial state
1448 * or running state (e.g. do a dep add while inside callback) */
1449 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1450 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1451 ret = AFS_WQ_ERROR;
1452 goto error;
1453 }
1454
1455 /* link dep node with child and parent work queue node */
1456 ret = _afs_wq_dep_link_r(dep, parent, child);
1457 if (ret) {
1458 goto error;
1459 }
1460
1461 /* handle blocking counts */
1462 switch (parent->state) {
1463 case AFS_WQ_NODE_STATE_INIT:
1464 case AFS_WQ_NODE_STATE_SCHEDULED:
1465 case AFS_WQ_NODE_STATE_RUNNING:
1466 case AFS_WQ_NODE_STATE_BLOCKED:
1467 child->block_count++;
1468 break;
1469
1470 case AFS_WQ_NODE_STATE_ERROR:
1471 child->error_count++;
1472 break;
1473
1474 default:
1475 (void)0; /* nop */
1476 }
1477
1478 done:
1479 if (held) {
1480 opr_mutex_exit(&child->lock);
1481 opr_mutex_exit(&parent->lock);
1482 }
1483 return ret;
1484
1485 error:
1486 if (dep) {
1487 _afs_wq_dep_free(dep);
1488 }
1489 goto done;
1490}
1491
1492/**
1493 * remove a dependency from a work node.
1494 *
1495 * @param[in] child node which was dependent upon completion of parent
1496 * @param[in] parent node whose completion gated child's execution
1497 *
1498 * @return operation status
1499 * @retval 0 success
1500 */
1501int
1502afs_wq_node_dep_del(struct afs_work_queue_node * child,
1503 struct afs_work_queue_node * parent)
1504{
1505 int code, ret = 0;
1506 struct afs_work_queue_dep_node * dep, * ndep;
1507 struct afs_work_queue_node_multilock ml;
1508 int held = 0;
1509
1510 memset(&ml, 0, sizeof(ml));
1511 ml.nodes[0].node = parent;
1512 ml.nodes[1].node = child;
1513 code = _afs_wq_node_multilock(&ml);
1514 if (code) {
1515 goto error;
1516 }
1517 held = 1;
1518
1519 /* only permit changes while child is in init state
1520 * or running state (e.g. do a dep del when in callback func) */
1521 if ((child->state != AFS_WQ_NODE_STATE_INIT) &&
1522 (child->state != AFS_WQ_NODE_STATE_RUNNING)) {
1523 ret = AFS_WQ_ERROR;
1524 goto error;
1525 }
1526
1527 /* locate node linking parent and child */
1528 for (queue_Scan(&parent->dep_children,
1529 dep,
1530 ndep,
1531 afs_work_queue_dep_node)) {
1532 if ((dep->child == child) &&
1533 (dep->parent == parent)) {
1534
1535 /* no need to grab an extra ref on dep->child here; the caller
1536 * should already have a ref on dep->child */
1537 code = _afs_wq_dep_unlink_r(dep);
1538 if (code) {
1539 ret = code;
1540 goto error;
1541 }
1542
1543 code = _afs_wq_dep_free(dep);
1544 if (code) {
1545 ret = code;
1546 goto error;
1547 }
1548 break;
1549 }
1550 }
1551
1552 error:
1553 if (held) {
1554 opr_mutex_exit(&child->lock);
1555 opr_mutex_exit(&parent->lock);
1556 }
1557 return ret;
1558}
1559
1560/**
1561 * block a work node from execution.
1562 *
1563 * this can be used to allow external events to influence work queue flow.
1564 *
1565 * @param[in] node work queue node to be blocked
1566 *
1567 * @return operation status
1568 * @retval 0 success
1569 *
1570 * @post external block count incremented
1571 */
1572int
1573afs_wq_node_block(struct afs_work_queue_node * node)
1574{
1575 int ret = 0;
1576 int start;
1577
1578 opr_mutex_enter(&node->lock);
1579 ret = _afs_wq_node_state_wait_busy(node);
1580 if (ret) {
1581 goto error_sync;
1582 }
1583
1584 start = node->block_count++;
1585
1586 if (!start &&
1587 (node->qidx == AFS_WQ_NODE_LIST_READY)) {
1588 /* unblocked->blocked transition, and we're already scheduled */
1589 ret = _afs_wq_node_list_remove(node,
1590 AFS_WQ_NODE_STATE_BUSY);
1591 if (ret) {
1592 goto error_sync;
1593 }
1594
1595 ret = _afs_wq_node_list_enqueue(&node->queue->blocked_list,
1596 node,
1597 AFS_WQ_NODE_STATE_BLOCKED);
1598 }
1599
1600 error_sync:
1601 opr_mutex_exit(&node->lock);
1602
1603 return ret;
1604}
1605
1606/**
1607 * unblock a work node for execution.
1608 *
1609 * this can be used to allow external events to influence work queue flow.
1610 *
1611 * @param[in] node work queue node to be blocked
1612 *
1613 * @return operation status
1614 * @retval 0 success
1615 *
1616 * @post external block count decremented
1617 */
1618int
1619afs_wq_node_unblock(struct afs_work_queue_node * node)
1620{
1621 int ret = 0;
1622 int end;
1623
1624 opr_mutex_enter(&node->lock);
1625 ret = _afs_wq_node_state_wait_busy(node);
1626 if (ret) {
1627 goto error_sync;
1628 }
1629
1630 end = --node->block_count;
1631
1632 if (!end &&
1633 (node->qidx == AFS_WQ_NODE_LIST_BLOCKED)) {
1634 /* blocked->unblock transition, and we're ready to be scheduled */
1635 ret = _afs_wq_node_list_remove(node,
1636 AFS_WQ_NODE_STATE_BUSY);
1637 if (ret) {
1638 goto error_sync;
1639 }
1640
1641 ret = _afs_wq_node_list_enqueue(&node->queue->ready_list,
1642 node,
1643 AFS_WQ_NODE_STATE_SCHEDULED);
1644 }
1645
1646 error_sync:
1647 opr_mutex_exit(&node->lock);
1648
1649 return ret;
1650}
1651
1652/**
1653 * initialize a afs_wq_add_opts struct with the default options.
1654 *
1655 * @param[out] opts options structure to initialize
1656 */
1657void
1658afs_wq_add_opts_init(struct afs_work_queue_add_opts *opts)
1659{
1660 opts->donate = 0;
1661 opts->block = 1;
1662 opts->force = 0;
1663}
1664
1665/**
1666 * schedule a work node for execution.
1667 *
1668 * @param[in] queue work queue
1669 * @param[in] node work node
1670 * @param[in] opts options for adding, or NULL for defaults
1671 *
1672 * @return operation status
1673 * @retval 0 success
1674 * @retval EWOULDBLOCK queue is full and opts specified not to block
1675 * @retval EINTR queue was full, we blocked to add, and the queue was
1676 * shutdown while we were blocking
1677 */
1678int
1679afs_wq_add(struct afs_work_queue *queue,
1680 struct afs_work_queue_node *node,
1681 struct afs_work_queue_add_opts *opts)
1682{
1683 int ret = 0;
1684 int donate, block, force, hithresh;
1685 struct afs_work_queue_node_list * list;
1686 struct afs_work_queue_add_opts l_opts;
1687 int waited_for_drain = 0;
1688 afs_wq_work_state_t state;
1689
1690 if (!opts) {
1691 afs_wq_add_opts_init(&l_opts);
1692 opts = &l_opts;
1693 }
1694
1695 donate = opts->donate;
1696 block = opts->block;
1697 force = opts->force;
1698
1699 retry:
1700 opr_mutex_enter(&node->lock);
1701
1702 ret = _afs_wq_node_state_wait_busy(node);
1703 if (ret) {
1704 goto error;
1705 }
1706
1707 if (!node->block_count && !node->error_count) {
1708 list = &queue->ready_list;
1709 state = AFS_WQ_NODE_STATE_SCHEDULED;
1710 } else if (node->error_count) {
1711 list = &queue->done_list;
1712 state = AFS_WQ_NODE_STATE_ERROR;
1713 } else {
1714 list = &queue->blocked_list;
1715 state = AFS_WQ_NODE_STATE_BLOCKED;
1716 }
1717
1718 ret = 0;
1719
1720 opr_mutex_enter(&queue->lock);
1721
1722 if (queue->shutdown) {
1723 ret = EINTR;
1724 opr_mutex_exit(&queue->lock);
1725 opr_mutex_exit(&node->lock);
1726 goto error;
1727 }
1728
1729 hithresh = queue->opts.pend_hithresh;
1730 if (hithresh > 0 && queue->pend_count >= hithresh) {
1731 queue->drain = 1;
1732 }
1733
1734 if (!force && (state == AFS_WQ_NODE_STATE_SCHEDULED
1735 || state == AFS_WQ_NODE_STATE_BLOCKED)) {
1736
1737 if (queue->drain) {
1738 if (block) {
1739 opr_mutex_exit(&node->lock);
1740 opr_cv_wait(&queue->pend_cv, &queue->lock);
1741
1742 if (queue->shutdown) {
1743 ret = EINTR;
1744 } else {
1745 opr_mutex_exit(&queue->lock);
1746
1747 waited_for_drain = 1;
1748
1749 goto retry;
1750 }
1751 } else {
1752 ret = EWOULDBLOCK;
1753 }
1754 }
1755 }
1756
1757 if (ret == 0) {
1758 queue->pend_count++;
1759 }
1760 if (waited_for_drain) {
1761 /* signal another thread that may have been waiting for drain */
1762 opr_cv_signal(&queue->pend_cv);
1763 }
1764
1765 opr_mutex_exit(&queue->lock);
1766
1767 if (ret) {
1768 goto error;
1769 }
1770
1771 if (!donate)
1772 node->refcount++;
1773 node->queue = queue;
1774
1775 ret = _afs_wq_node_list_enqueue(list,
1776 node,
1777 state);
1778 error:
1779 return ret;
1780}
1781
1782/**
1783 * de-schedule a work node.
1784 *
1785 * @param[in] node work node
1786 *
1787 * @return operation status
1788 * @retval 0 success
1789 */
1790int
1791afs_wq_del(struct afs_work_queue_node * node)
1792{
1793 /* XXX todo */
1794 return ENOTSUP;
1795}
1796
1797/**
1798 * execute a node on the queue.
1799 *
1800 * @param[in] queue work queue
1801 * @param[in] rock opaque pointer (passed as third arg to callback func)
1802 *
1803 * @return operation status
1804 * @retval 0 completed a work unit
1805 */
1806int
1807afs_wq_do(struct afs_work_queue * queue,
1808 void * rock)
1809{
1810 return _afs_wq_do(queue, rock, 1);
1811}
1812
1813/**
1814 * execute a node on the queue, if there is any work to do.
1815 *
1816 * @param[in] queue work queue
1817 * @param[in] rock opaque pointer (passed as third arg to callback func)
1818 *
1819 * @return operation status
1820 * @retval 0 completed a work unit
1821 * @retval EWOULDBLOCK there was nothing to do
1822 */
1823int
1824afs_wq_do_nowait(struct afs_work_queue * queue,
1825 void * rock)
1826{
1827 return _afs_wq_do(queue, rock, 0);
1828}
1829
1830/**
1831 * wait for all pending nodes to finish.
1832 *
1833 * @param[in] queue work queue
1834 *
1835 * @return operation status
1836 * @retval 0 success
1837 *
1838 * @post the specified queue was empty at some point; it may not be empty by
1839 * the time this function returns, but at some point after the function was
1840 * called, there were no nodes in the ready queue or blocked queue.
1841 */
1842int
1843afs_wq_wait_all(struct afs_work_queue *queue)
1844{
1845 int ret = 0;
1846
1847 opr_mutex_enter(&queue->lock);
1848
1849 while (queue->pend_count > 0 && !queue->shutdown) {
1850 opr_cv_wait(&queue->empty_cv, &queue->lock);
1851 }
1852
1853 if (queue->shutdown) {
1854 /* queue has been shut down, but there may still be some threads
1855 * running e.g. in the middle of their callback. ensure they have
1856 * stopped before we return. */
1857 while (queue->running_count > 0) {
1858 opr_cv_wait(&queue->running_cv, &queue->lock);
1859 }
1860 ret = EINTR;
1861 goto done;
1862 }
1863
1864 done:
1865 opr_mutex_exit(&queue->lock);
1866
1867 /* technically this doesn't really guarantee that the work queue is empty
1868 * after we return, but we do guarantee that it was empty at some point */
1869
1870 return ret;
1871}
1872
1873/**
1874 * wait for a node to complete; dequeue from done list.
1875 *
1876 * @param[in] node work queue node
1877 * @param[out] retcode return code from work unit
1878 *
1879 * @return operation status
1880 * @retval 0 sucess
1881 *
1882 * @pre ref held on node
1883 */
1884int
1885afs_wq_node_wait(struct afs_work_queue_node * node,
1886 int * retcode)
1887{
1888 int ret = 0;
1889
1890 opr_mutex_enter(&node->lock);
1891 if (node->state == AFS_WQ_NODE_STATE_INIT) {
1892 /* not sure what to do in this case */
1893 goto done_sync;
1894 }
1895
1896 while ((node->state != AFS_WQ_NODE_STATE_DONE) &&
1897 (node->state != AFS_WQ_NODE_STATE_ERROR)) {
1898 opr_cv_wait(&node->state_cv, &node->lock);
1899 }
1900 if (retcowait{
1901 *retcode = node->retcode;
1902 }
1903
1904 if (node->queue == NULL) {
1905 /* nothing we can do */
1906 goto done_sync;
1907 }
1908
1909 ret = _afs_wq_node_list_remove(node,
1910 AFS_WQ_NODE_STATE_INIT);
1911
1912 done_sync:
1913 opr_mutex_exit(&node->lock);
1914
1915 return ret;
1916}