2 * Copyright 2008-2010, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
19 #define __AFS_WORK_QUEUE_IMPL 1
20 #include "work_queue.h"
21 #include "work_queue_impl.h"
24 * public interfaces for work_queue.
28 _afs_wq_node_put_r(struct afs_work_queue_node
* node
,
32 * allocate a work queue object.
34 * @param[out] queue_out address in which to store queue pointer
36 * @return operation status
38 * @retval ENOMEM out of memory
43 _afs_wq_alloc(struct afs_work_queue
** queue_out
)
46 struct afs_work_queue
* queue
;
48 *queue_out
= queue
= malloc(sizeof(*queue
));
59 * free a work queue object.
61 * @param[in] queue work queue object
63 * @return operation status
69 _afs_wq_free(struct afs_work_queue
* queue
)
79 * change a node's state.
81 * @param[in] node node object
82 * @param[in] new_state new object state
86 * @pre node->lock held
90 static afs_wq_work_state_t
91 _afs_wq_node_state_change(struct afs_work_queue_node
* node
,
92 afs_wq_work_state_t new_state
)
94 afs_wq_work_state_t old_state
;
96 old_state
= node
->state
;
97 node
->state
= new_state
;
99 opr_cv_broadcast(&node
->state_cv
);
105 * wait for a node's state to change from busy to something else.
107 * @param[in] node node object
109 * @return operation status
112 * @pre node->lock held
117 _afs_wq_node_state_wait_busy(struct afs_work_queue_node
* node
)
119 while (node
->state
== AFS_WQ_NODE_STATE_BUSY
) {
120 opr_cv_wait(&node
->state_cv
, &node
->lock
);
127 * check whether a work queue node is busy.
129 * @param[in] node node object pointer
131 * @return whether node is busy
132 * @retval 1 node is busy
133 * @retval 0 node is not busy
135 * @pre node->lock held
140 _afs_wq_node_state_is_busy(struct afs_work_queue_node
* node
)
142 return (node
->state
== AFS_WQ_NODE_STATE_BUSY
);
146 * attempt to simultaneously lock two work queue nodes.
148 * this is a somewhat tricky algorithm because there is no
149 * defined hierarchy within the work queue node population.
151 * @param[in] ml multilock control structure
153 * @return operation status
156 * @note in theory, we could easily extend this to
157 * lock more than two nodes
160 * - caller MUST NOT have set busy state on either node
163 * - locks held on both nodes
164 * - both nodes in quiescent states
166 * @note node with non-zero lock_held or busy_held fields
167 * MUST go in array index 0
172 _afs_wq_node_multilock(struct afs_work_queue_node_multilock
* ml
)
175 struct timespec delay
;
176 int first
= 1, second
= 0, tmp
;
178 /* first pass processing */
179 if (ml
->nodes
[0].lock_held
) {
180 if (!ml
->nodes
[0].busy_held
) {
181 ret
= _afs_wq_node_state_wait_busy(ml
->nodes
[0].node
);
187 code
= opr_mutex_tryenter(&ml
->nodes
[1].node
->lock
);
193 /* setup for main loop */
194 opr_mutex_exit(&ml
->nodes
[0].node
->lock
);
198 * setup random exponential backoff
200 * set initial delay to random value in the range [500,1000) ns
203 delay
.tv_nsec
= 500 + rand() % 500;
206 opr_mutex_enter(&ml
->nodes
[first
].node
->lock
);
207 if ((first
!= 0) || !ml
->nodes
[0].busy_held
) {
208 ret
= _afs_wq_node_state_wait_busy(ml
->nodes
[first
].node
);
211 if (!ml
->nodes
[0].lock_held
|| first
) {
212 opr_mutex_exit(&ml
->nodes
[first
].node
->lock
);
213 if (ml
->nodes
[0].lock_held
) {
214 /* on error, return with locks in same state as before call */
215 opr_mutex_enter(&ml
->nodes
[0].node
->lock
);
223 * in order to avoid deadlock, we must use trylock and
224 * a non-blocking state check. if we meet any contention,
225 * we must drop back and start again.
227 code
= opr_mutex_tryenter(&ml
->nodes
[second
].node
->lock
);
229 if (((second
== 0) && (ml
->nodes
[0].busy_held
)) ||
230 !_afs_wq_node_state_is_busy(ml
->nodes
[second
].node
)) {
234 opr_mutex_exit(&ml
->nodes
[second
].node
->lock
);
241 * drop locks, use exponential backoff,
242 * try acquiring in the opposite order
244 opr_mutex_exit(&ml
->nodes
[first
].node
->lock
);
245 nanosleep(&delay
, NULL
);
246 if (delay
.tv_nsec
<= 65536000) { /* max backoff delay of ~131ms */
260 * initialize a node list object.
262 * @param[in] list list object
263 * @param[in] id list identifier
265 * @return operation status
271 _afs_wq_node_list_init(struct afs_work_queue_node_list
* list
,
272 afs_wq_node_list_id_t id
)
274 queue_Init(&list
->list
);
275 opr_mutex_init(&list
->lock
);
276 opr_cv_init(&list
->cv
);
284 * destroy a node list object.
286 * @param[in] list list object
288 * @return operation status
290 * @retval AFS_WQ_ERROR list not empty
295 _afs_wq_node_list_destroy(struct afs_work_queue_node_list
* list
)
299 if (queue_IsNotEmpty(&list
->list
)) {
304 opr_mutex_destroy(&list
->lock
);
305 opr_cv_destroy(&list
->cv
);
312 * wakeup all threads waiting in dequeue.
314 * @param[in] list list object
316 * @return operation status
322 _afs_wq_node_list_shutdown(struct afs_work_queue_node_list
* list
)
325 struct afs_work_queue_node
*node
, *nnode
;
327 opr_mutex_enter(&list
->lock
);
330 for (queue_Scan(&list
->list
, node
, nnode
, afs_work_queue_node
)) {
331 _afs_wq_node_state_change(node
, AFS_WQ_NODE_STATE_ERROR
);
333 node
->qidx
= AFS_WQ_NODE_LIST_NONE
;
336 if (node
->detached
) {
337 /* if we are detached, we hold the reference on the node;
338 * otherwise, it is some other caller that holds the reference.
339 * So don't put the node if we are not detached; the node will
340 * get freed when someone else calls afs_wq_node_put */
341 afs_wq_node_put(node
);
345 opr_cv_broadcast(&list
->cv
);
346 opr_mutex_exit(&list
->lock
);
352 * append to a node list object.
354 * @param[in] list list object
355 * @param[in] node node object
356 * @param[in] state new node state
358 * @return operation status
360 * @retval AFS_WQ_ERROR raced to enqueue node
364 * - node is not on a list
365 * - node is either not busy, or it is marked as busy by the calling thread
369 * - node lock dropped
374 _afs_wq_node_list_enqueue(struct afs_work_queue_node_list
* list
,
375 struct afs_work_queue_node
* node
,
376 afs_wq_work_state_t state
)
380 if (node
->qidx
!= AFS_WQ_NODE_LIST_NONE
) {
386 /* deal with lock inversion */
387 code
= opr_mutex_tryenter(&list
->lock
);
390 _afs_wq_node_state_change(node
, AFS_WQ_NODE_STATE_BUSY
);
391 opr_mutex_exit(&node
->lock
);
392 opr_mutex_enter(&list
->lock
);
393 opr_mutex_enter(&node
->lock
);
395 /* assert state of the world (we set busy, so this should never happen) */
396 opr_Assert(queue_IsNotOnQueue(node
));
399 if (list
->shutdown
) {
404 opr_Assert(node
->qidx
== AFS_WQ_NODE_LIST_NONE
);
405 if (queue_IsEmpty(&list
->list
)) {
406 /* wakeup a dequeue thread */
407 opr_cv_signal(&list
->cv
);
409 queue_Append(&list
->list
, node
);
410 node
->qidx
= list
->qidx
;
411 _afs_wq_node_state_change(node
, state
);
414 opr_mutex_exit(&node
->lock
);
415 opr_mutex_exit(&list
->lock
);
422 * dequeue a node from a list object.
424 * @param[in] list list object
425 * @param[out] node_out address in which to store node object pointer
426 * @param[in] state new node state
427 * @param[in] block permit blocking on cv if asserted
429 * @return operation status
431 * @retval EWOULDBLOCK block not asserted and nothing to dequeue
432 * @retval EINTR blocking wait interrupted by list shutdown
434 * @post node object returned with node lock held and new state set
439 _afs_wq_node_list_dequeue(struct afs_work_queue_node_list
* list
,
440 struct afs_work_queue_node
** node_out
,
441 afs_wq_work_state_t state
,
445 struct afs_work_queue_node
* node
;
447 opr_mutex_enter(&list
->lock
);
449 if (list
->shutdown
) {
455 if (!block
&& queue_IsEmpty(&list
->list
)) {
461 while (queue_IsEmpty(&list
->list
)) {
462 if (list
->shutdown
) {
467 opr_cv_wait(&list
->cv
, &list
->lock
);
470 *node_out
= node
= queue_First(&list
->list
, afs_work_queue_node
);
472 opr_mutex_enter(&node
->lock
);
474 node
->qidx
= AFS_WQ_NODE_LIST_NONE
;
475 _afs_wq_node_state_change(node
, state
);
478 opr_mutex_exit(&list
->lock
);
484 * remove a node from a list.
486 * @param[in] node node object
487 * @param[in] next_state node state following successful dequeue
489 * @return operation status
491 * @retval AFS_WQ_ERROR in any of the following conditions:
492 * - node not associated with a work queue
493 * - node was not on a linked list (e.g. RUNNING state)
494 * - we raced another thread
496 * @pre node->lock held
498 * @post node removed from node list
500 * @note node->lock may be dropped internally
505 _afs_wq_node_list_remove(struct afs_work_queue_node
* node
,
506 afs_wq_work_state_t next_state
)
509 struct afs_work_queue_node_list
* list
= NULL
;
511 _afs_wq_node_state_wait_busy(node
);
517 switch (node
->qidx
) {
518 case AFS_WQ_NODE_LIST_READY
:
519 list
= &node
->queue
->ready_list
;
522 case AFS_WQ_NODE_LIST_BLOCKED
:
523 list
= &node
->queue
->blocked_list
;
526 case AFS_WQ_NODE_LIST_DONE
:
527 list
= &node
->queue
->done_list
;
535 code
= opr_mutex_tryenter(&list
->lock
);
538 _afs_wq_node_state_change(node
,
539 AFS_WQ_NODE_STATE_BUSY
);
540 opr_mutex_exit(&node
->lock
);
541 opr_mutex_enter(&list
->lock
);
542 opr_mutex_enter(&node
->lock
);
544 if (node
->qidx
== AFS_WQ_NODE_LIST_NONE
) {
552 node
->qidx
= AFS_WQ_NODE_LIST_NONE
;
553 _afs_wq_node_state_change(node
, next_state
);
556 opr_mutex_exit(&list
->lock
);
564 * allocate a dependency node.
566 * @param[out] node_out address in which to store dep node pointer
568 * @return operation status
570 * @retval ENOMEM out of memory
575 _afs_wq_dep_alloc(struct afs_work_queue_dep_node
** node_out
)
578 struct afs_work_queue_dep_node
* node
;
580 node
= malloc(sizeof(*node
));
586 queue_NodeInit(&node
->parent_list
);
587 node
->parent
= node
->child
= NULL
;
596 * free a dependency node.
598 * @param[in] node dep node pointer
600 * @return operation status
602 * @retval AFS_WQ_ERROR still attached to a work node
607 _afs_wq_dep_free(struct afs_work_queue_dep_node
* node
)
611 if (queue_IsOnQueue(&node
->parent_list
) ||
625 * unlink work nodes from a dependency node.
627 * @param[in] dep dependency node
629 * @return operation status
633 * - dep->parent and dep->child are either locked, or are not referenced
635 * - caller holds ref on dep->child
636 * - dep->child and dep->parent in quiescent state
641 _afs_wq_dep_unlink_r(struct afs_work_queue_dep_node
*dep
)
643 struct afs_work_queue_node
*child
= dep
->child
;
644 queue_Remove(&dep
->parent_list
);
648 return _afs_wq_node_put_r(child
, 0);
652 * get a reference to a work node.
654 * @param[in] node work queue node
656 * @return operation status
659 * @pre node->lock held
664 _afs_wq_node_get_r(struct afs_work_queue_node
* node
)
672 * unlink and free all of the dependency nodes from a node.
674 * @param[in] parent work node that is the parent node of all deps to be freed
676 * @return operation status
679 * @pre parent->refcount == 0
682 _afs_wq_node_free_deps(struct afs_work_queue_node
*parent
)
685 struct afs_work_queue_node
*node_unlock
= NULL
, *node_put
= NULL
;
686 struct afs_work_queue_dep_node
* dep
, * nd
;
688 /* unlink and free all of the dep structs attached to 'parent' */
689 for (queue_Scan(&parent
->dep_children
,
692 afs_work_queue_dep_node
)) {
694 opr_mutex_enter(&dep
->child
->lock
);
695 node_unlock
= dep
->child
;
697 /* We need to get a ref on child here, since _afs_wq_dep_unlink_r may
698 * put the last ref on the child, and we need the child to still exist
699 * so we can unlock it */
700 code
= _afs_wq_node_get_r(dep
->child
);
704 node_put
= dep
->child
;
706 /* remember, no need to lock dep->parent, since its refcount is 0 */
707 code
= _afs_wq_dep_unlink_r(dep
);
711 _afs_wq_node_put_r(node_put
, 1);
712 } else if (node_unlock
) {
713 opr_mutex_exit(&node_unlock
->lock
);
715 node_put
= node_unlock
= NULL
;
718 /* Only do this if everything is okay; if code is nonzero,
719 * something will still be pointing at dep, so don't free it.
720 * We will leak memory, but that's better than memory corruption;
721 * we've done all we can do to try and free the dep memory */
722 code
= _afs_wq_dep_free(dep
);
733 * propagate state down through dep nodes.
735 * @param[in] parent parent node object
736 * @param[in] next_state next state parent will assume
738 * @return operation status
742 * - parent->lock held
747 _afs_wq_dep_propagate(struct afs_work_queue_node
* parent
,
748 afs_wq_work_state_t next_state
)
751 struct afs_work_queue_dep_node
* dep
, * nd
;
752 struct afs_work_queue_node_multilock ml
;
753 afs_wq_work_state_t old_state
;
754 afs_wq_node_list_id_t qidx
;
755 struct afs_work_queue_node_list
* ql
;
756 afs_wq_work_state_t cns
;
758 old_state
= _afs_wq_node_state_change(parent
,
759 AFS_WQ_NODE_STATE_BUSY
);
760 ml
.nodes
[0].node
= parent
;
761 ml
.nodes
[0].lock_held
= 1;
762 ml
.nodes
[0].busy_held
= 1;
764 /* scan through our children updating scheduling state */
765 for (queue_Scan(&parent
->dep_children
,
768 afs_work_queue_dep_node
)) {
769 /* skip half-registered nodes */
770 if (dep
->child
== NULL
) {
774 ml
.nodes
[1].node
= dep
->child
;
775 ml
.nodes
[1].lock_held
= 0;
776 ml
.nodes
[1].busy_held
= 0;
777 ret
= _afs_wq_node_multilock(&ml
);
782 switch (next_state
) {
783 case AFS_WQ_NODE_STATE_DONE
:
784 dep
->child
->block_count
--;
787 case AFS_WQ_NODE_STATE_ERROR
:
788 dep
->child
->error_count
++;
795 /* skip unscheduled nodes */
796 if (dep
->child
->queue
== NULL
) {
797 opr_mutex_exit(&dep
->child
->lock
);
802 * when blocked dep and error'd dep counts reach zero, the
803 * node can be scheduled for execution
805 if (dep
->child
->error_count
) {
806 ql
= &dep
->child
->queue
->done_list
;
807 qidx
= AFS_WQ_NODE_LIST_DONE
;
808 cns
= AFS_WQ_NODE_STATE_ERROR
;
809 } else if (dep
->child
->block_count
) {
810 ql
= &dep
->child
->queue
->blocked_list
;
811 qidx
= AFS_WQ_NODE_LIST_BLOCKED
;
812 cns
= AFS_WQ_NODE_STATE_BLOCKED
;
814 ql
= &dep
->child
->queue
->ready_list
;
815 qidx
= AFS_WQ_NODE_LIST_READY
;
816 cns
= AFS_WQ_NODE_STATE_SCHEDULED
;
819 if (qidx
!= dep
->child
->qidx
) {
820 /* we're transitioning to a different queue */
821 ret
= _afs_wq_node_list_remove(dep
->child
,
822 AFS_WQ_NODE_STATE_BUSY
);
824 opr_mutex_exit(&dep
->child
->lock
);
828 ret
= _afs_wq_node_list_enqueue(ql
,
832 opr_mutex_exit(&dep
->child
->lock
);
836 opr_mutex_exit(&dep
->child
->lock
);
840 _afs_wq_node_state_change(parent
,
846 * decrements queue->running_count, and signals waiters if appropriate.
848 * @param[in] queue queue to dec the running count of
851 _afs_wq_dec_running_count(struct afs_work_queue
*queue
)
853 opr_mutex_enter(&queue
->lock
);
854 queue
->running_count
--;
855 if (queue
->shutdown
&& queue
->running_count
== 0) {
856 /* if we've shut down, someone may be waiting for the running count
858 opr_cv_broadcast(&queue
->running_cv
);
860 opr_mutex_exit(&queue
->lock
);
864 * execute a node on the queue.
866 * @param[in] queue work queue
867 * @param[in] rock opaque pointer (passed as third arg to callback func)
868 * @param[in] block allow blocking in dequeue
870 * @return operation status
871 * @retval 0 completed a work unit
876 _afs_wq_do(struct afs_work_queue
* queue
,
881 struct afs_work_queue_node
* node
;
882 afs_wq_callback_func_t
* cbf
;
883 afs_wq_work_state_t next_state
;
884 struct afs_work_queue_node_list
* ql
;
888 /* We can inc queue->running_count before actually pulling the node off
889 * of the ready_list, since running_count only really matters when we are
890 * shut down. If we get shut down before we pull the node off of
891 * ready_list, but after we inc'd running_count,
892 * _afs_wq_node_list_dequeue should return immediately with EINTR,
893 * in which case we'll dec running_count, so it's as if we never inc'd it
894 * in the first place. */
895 opr_mutex_enter(&queue
->lock
);
896 if (queue
->shutdown
) {
897 opr_mutex_exit(&queue
->lock
);
900 queue
->running_count
++;
901 opr_mutex_exit(&queue
->lock
);
903 ret
= _afs_wq_node_list_dequeue(&queue
->ready_list
,
905 AFS_WQ_NODE_STATE_RUNNING
,
908 _afs_wq_dec_running_count(queue
);
913 node_rock
= node
->rock
;
914 detached
= node
->detached
;
917 opr_mutex_exit(&node
->lock
);
918 code
= (*cbf
)(queue
, node
, queue
->rock
, node_rock
, rock
);
919 opr_mutex_enter(&node
->lock
);
921 next_state
= AFS_WQ_NODE_STATE_DONE
;
922 ql
= &queue
->done_list
;
923 } else if (code
== AFS_WQ_ERROR_RESCHEDULE
) {
924 if (node
->error_count
) {
925 next_state
= AFS_WQ_NODE_STATE_ERROR
;
926 ql
= &queue
->done_list
;
927 } else if (node
->block_count
) {
928 next_state
= AFS_WQ_NODE_STATE_BLOCKED
;
929 ql
= &queue
->blocked_list
;
931 next_state
= AFS_WQ_NODE_STATE_SCHEDULED
;
932 ql
= &queue
->ready_list
;
935 next_state
= AFS_WQ_NODE_STATE_ERROR
;
936 ql
= &queue
->done_list
;
939 next_state
= AFS_WQ_NODE_STATE_DONE
;
941 ql
= &queue
->done_list
;
944 _afs_wq_dec_running_count(queue
);
946 node
->retcode
= code
;
948 if ((next_state
== AFS_WQ_NODE_STATE_DONE
) ||
949 (next_state
== AFS_WQ_NODE_STATE_ERROR
)) {
951 opr_mutex_enter(&queue
->lock
);
953 if (queue
->drain
&& queue
->pend_count
== queue
->opts
.pend_lothresh
) {
954 /* signal other threads if we're about to below the low
955 * pending-tasks threshold */
957 opr_cv_signal(&queue
->pend_cv
);
960 if (queue
->pend_count
== 1) {
961 /* signal other threads if we're about to become 'empty' */
962 opr_cv_broadcast(&queue
->empty_cv
);
967 opr_mutex_exit(&queue
->lock
);
970 ret
= _afs_wq_node_state_wait_busy(node
);
975 /* propagate scheduling changes down through dependencies */
976 ret
= _afs_wq_dep_propagate(node
, next_state
);
981 ret
= _afs_wq_node_state_wait_busy(node
);
987 ((next_state
== AFS_WQ_NODE_STATE_DONE
) ||
988 (next_state
== AFS_WQ_NODE_STATE_ERROR
))) {
989 _afs_wq_node_state_change(node
, next_state
);
990 _afs_wq_node_put_r(node
, 1);
992 ret
= _afs_wq_node_list_enqueue(ql
,
1002 * initialize a struct afs_work_queue_opts to the default values
1004 * @param[out] opts opts struct to initialize
1007 afs_wq_opts_init(struct afs_work_queue_opts
*opts
)
1009 opts
->pend_lothresh
= 0;
1010 opts
->pend_hithresh
= 0;
1014 * set the options for a struct afs_work_queue_opts appropriate for a certain
1015 * number of threads.
1017 * @param[out] opts opts struct in which to set the values
1018 * @param[in] threads number of threads
1021 afs_wq_opts_calc_thresh(struct afs_work_queue_opts
*opts
, int threads
)
1023 opts
->pend_lothresh
= threads
* 2;
1024 opts
->pend_hithresh
= threads
* 16;
1027 if (opts
->pend_lothresh
< 1) {
1028 opts
->pend_lothresh
= 1;
1030 if (opts
->pend_hithresh
< 2) {
1031 opts
->pend_hithresh
= 2;
1036 * allocate and initialize a work queue object.
1038 * @param[out] queue_out address in which to store newly allocated work queue object
1039 * @param[in] rock work queue opaque pointer (passed as first arg to all fired callbacks)
1040 * @param[in] opts options for the new created queue
1042 * @return operation status
1044 * @retval ENOMEM out of memory
1047 afs_wq_create(struct afs_work_queue
** queue_out
,
1049 struct afs_work_queue_opts
*opts
)
1052 struct afs_work_queue
* queue
;
1054 ret
= _afs_wq_alloc(queue_out
);
1061 memcpy(&queue
->opts
, opts
, sizeof(queue
->opts
));
1063 afs_wq_opts_init(&queue
->opts
);
1066 _afs_wq_node_list_init(&queue
->ready_list
,
1067 AFS_WQ_NODE_LIST_READY
);
1068 _afs_wq_node_list_init(&queue
->blocked_list
,
1069 AFS_WQ_NODE_LIST_BLOCKED
);
1070 _afs_wq_node_list_init(&queue
->done_list
,
1071 AFS_WQ_NODE_LIST_DONE
);
1074 queue
->shutdown
= 0;
1075 queue
->pend_count
= 0;
1076 queue
->running_count
= 0;
1078 opr_mutex_init(&queue
->lock
);
1079 opr_cv_init(&queue
->pend_cv
);
1080 opr_cv_init(&queue
->empty_cv
);
1081 opr_cv_init(&queue
->running_cv
);
1088 * deallocate and free a work queue object.
1090 * @param[in] queue work queue to be destroyed
1092 * @return operation status
1094 * @retval AFS_WQ_ERROR unspecified error
1097 afs_wq_destroy(struct afs_work_queue
* queue
)
1101 ret
= _afs_wq_node_list_destroy(&queue
->ready_list
);
1106 ret
= _afs_wq_node_list_destroy(&queue
->blocked_list
);
1111 ret
= _afs_wq_node_list_destroy(&queue
->done_list
);
1116 ret
= _afs_wq_free(queue
);
1123 * shutdown a work queue.
1125 * @param[in] queue work queue object pointer
1127 * @return operation status
1131 afs_wq_shutdown(struct afs_work_queue
* queue
)
1135 opr_mutex_enter(&queue
->lock
);
1136 if (queue
->shutdown
) {
1137 /* already shutdown, do nothing */
1138 opr_mutex_exit(&queue
->lock
);
1141 queue
->shutdown
= 1;
1143 ret
= _afs_wq_node_list_shutdown(&queue
->ready_list
);
1148 ret
= _afs_wq_node_list_shutdown(&queue
->blocked_list
);
1153 ret
= _afs_wq_node_list_shutdown(&queue
->done_list
);
1158 /* signal everyone that could be waiting, since these conditions will
1159 * generally fail to signal on their own if we're shutdown, since no
1160 * progress is being made */
1161 opr_cv_broadcast(&queue
->pend_cv
);
1162 opr_cv_broadcast(&queue
->empty_cv
);
1163 opr_mutex_exit(&queue
->lock
);
1170 * allocate a work node.
1172 * @param[out] node_out address in which to store new work node
1174 * @return operation status
1176 * @retval ENOMEM out of memory
1179 afs_wq_node_alloc(struct afs_work_queue_node
** node_out
)
1182 struct afs_work_queue_node
* node
;
1184 *node_out
= node
= malloc(sizeof(*node
));
1190 queue_NodeInit(&node
->node_list
);
1191 node
->qidx
= AFS_WQ_NODE_LIST_NONE
;
1193 node
->rock
= node
->queue
= NULL
;
1195 node
->block_count
= 0;
1196 node
->error_count
= 0;
1197 opr_mutex_init(&node
->lock
);
1198 opr_cv_init(&node
->state_cv
);
1199 node
->state
= AFS_WQ_NODE_STATE_INIT
;
1200 queue_Init(&node
->dep_children
);
1209 * @param[in] node work node object
1211 * @return operation status
1217 _afs_wq_node_free(struct afs_work_queue_node
* node
)
1221 if (queue_IsOnQueue(node
) ||
1222 (node
->state
== AFS_WQ_NODE_STATE_SCHEDULED
) ||
1223 (node
->state
== AFS_WQ_NODE_STATE_RUNNING
) ||
1224 (node
->state
== AFS_WQ_NODE_STATE_BLOCKED
)) {
1229 ret
= _afs_wq_node_free_deps(node
);
1234 opr_mutex_destroy(&node
->lock
);
1235 opr_cv_destory(&node
->state_cv
);
1237 if (node
->rock_dtor
) {
1238 (*node
->rock_dtor
) (node
->rock
);
1248 * get a reference to a work node.
1250 * @param[in] node work queue node
1252 * @return operation status
1256 afs_wq_node_get(struct afs_work_queue_node
* node
)
1258 opr_mutex_enter(&node
->lock
);
1260 opr_mutex_exit(&node
->lock
);
1266 * put back a reference to a work node.
1268 * @param[in] node work queue node
1269 * @param[in] drop drop node->lock
1271 * @post if refcount reaches zero, node is deallocated.
1273 * @return operation status
1276 * @pre node->lock held
1281 _afs_wq_node_put_r(struct afs_work_queue_node
* node
,
1286 opr_Assert(node
->refcount
> 0);
1287 refc
= --node
->refcount
;
1289 opr_mutex_exit(&node
->lock
);
1292 opr_Assert(node
->qidx
== AFS_WQ_NODE_LIST_NONE
);
1293 _afs_wq_node_free(node
);
1300 * put back a reference to a work node.
1302 * @param[in] node work queue node
1304 * @post if refcount reaches zero, node is deallocated.
1306 * @return operation status
1310 afs_wq_node_put(struct afs_work_queue_node
* node
)
1312 opr_mutex_enter(&node
->lock
);
1313 return _afs_wq_node_put_r(node
, 1);
1317 * set the callback function on a work node.
1319 * @param[in] node work queue node
1320 * @param[in] cbf callback function
1321 * @param[in] rock opaque pointer passed to callback
1322 * @param[in] rock_dtor destructor function for 'rock', or NULL
1324 * @return operation status
1328 afs_wq_node_set_callback(struct afs_work_queue_node
* node
,
1329 afs_wq_callback_func_t
* cbf
,
1330 void * rock
, afs_wq_callback_dtor_t
*dtor
)
1332 opr_mutex_enter(&node
->lock
);
1335 node
->rock_dtor
= dtor
;
1336 opr_mutex_exit(&node
->lock
);
1344 * @param[in] node work queue node
1346 * @return operation status
1350 afs_wq_node_set_detached(struct afs_work_queue_node
* node
)
1352 opr_mutex_enter(&node
->lock
);
1354 opr_mutex_exit(&node
->lock
);
1360 * link a dependency node to a parent and child work node.
1362 * This links a dependency node such that when the 'parent' work node is
1363 * done, the 'child' work node can proceed.
1365 * @param[in] dep dependency node
1366 * @param[in] parent parent node in this dependency
1367 * @param[in] child child node in this dependency
1369 * @return operation status
1373 * - parent->lock held
1374 * - child->lock held
1375 * - parent and child in quiescent state
1380 _afs_wq_dep_link_r(struct afs_work_queue_dep_node
*dep
,
1381 struct afs_work_queue_node
*parent
,
1382 struct afs_work_queue_node
*child
)
1386 /* Each dep node adds a ref to the child node of that dep. We do not
1387 * do the same for the parent node, since if the only refs remaining
1388 * for a node are deps in node->dep_children, then the node should be
1389 * destroyed, and we will destroy the dep nodes when we free the
1391 ret
= _afs_wq_node_get_r(child
);
1396 /* add this dep node to the parent node's list of deps */
1397 queue_Append(&parent
->dep_children
, &dep
->parent_list
);
1400 dep
->parent
= parent
;
1407 * add a dependency to a work node.
1409 * @param[in] child node which will be dependent upon completion of parent
1410 * @param[in] parent node whose completion gates child's execution
1413 * - child is in initial state (last op was afs_wq_node_alloc or afs_wq_node_wait)
1415 * @return operation status
1419 afs_wq_node_dep_add(struct afs_work_queue_node
* child
,
1420 struct afs_work_queue_node
* parent
)
1423 struct afs_work_queue_dep_node
* dep
= NULL
;
1424 struct afs_work_queue_node_multilock ml
;
1427 /* self references are bad, mmkay? */
1428 if (parent
== child
) {
1433 ret
= _afs_wq_dep_alloc(&dep
);
1438 memset(&ml
, 0, sizeof(ml
));
1439 ml
.nodes
[0].node
= parent
;
1440 ml
.nodes
[1].node
= child
;
1441 ret
= _afs_wq_node_multilock(&ml
);
1447 /* only allow dep modification while in initial state
1448 * or running state (e.g. do a dep add while inside callback) */
1449 if ((child
->state
!= AFS_WQ_NODE_STATE_INIT
) &&
1450 (child
->state
!= AFS_WQ_NODE_STATE_RUNNING
)) {
1455 /* link dep node with child and parent work queue node */
1456 ret
= _afs_wq_dep_link_r(dep
, parent
, child
);
1461 /* handle blocking counts */
1462 switch (parent
->state
) {
1463 case AFS_WQ_NODE_STATE_INIT
:
1464 case AFS_WQ_NODE_STATE_SCHEDULED
:
1465 case AFS_WQ_NODE_STATE_RUNNING
:
1466 case AFS_WQ_NODE_STATE_BLOCKED
:
1467 child
->block_count
++;
1470 case AFS_WQ_NODE_STATE_ERROR
:
1471 child
->error_count
++;
1480 opr_mutex_exit(&child
->lock
);
1481 opr_mutex_exit(&parent
->lock
);
1487 _afs_wq_dep_free(dep
);
1493 * remove a dependency from a work node.
1495 * @param[in] child node which was dependent upon completion of parent
1496 * @param[in] parent node whose completion gated child's execution
1498 * @return operation status
1502 afs_wq_node_dep_del(struct afs_work_queue_node
* child
,
1503 struct afs_work_queue_node
* parent
)
1506 struct afs_work_queue_dep_node
* dep
, * ndep
;
1507 struct afs_work_queue_node_multilock ml
;
1510 memset(&ml
, 0, sizeof(ml
));
1511 ml
.nodes
[0].node
= parent
;
1512 ml
.nodes
[1].node
= child
;
1513 code
= _afs_wq_node_multilock(&ml
);
1519 /* only permit changes while child is in init state
1520 * or running state (e.g. do a dep del when in callback func) */
1521 if ((child
->state
!= AFS_WQ_NODE_STATE_INIT
) &&
1522 (child
->state
!= AFS_WQ_NODE_STATE_RUNNING
)) {
1527 /* locate node linking parent and child */
1528 for (queue_Scan(&parent
->dep_children
,
1531 afs_work_queue_dep_node
)) {
1532 if ((dep
->child
== child
) &&
1533 (dep
->parent
== parent
)) {
1535 /* no need to grab an extra ref on dep->child here; the caller
1536 * should already have a ref on dep->child */
1537 code
= _afs_wq_dep_unlink_r(dep
);
1543 code
= _afs_wq_dep_free(dep
);
1554 opr_mutex_exit(&child
->lock
);
1555 opr_mutex_exit(&parent
->lock
);
1561 * block a work node from execution.
1563 * this can be used to allow external events to influence work queue flow.
1565 * @param[in] node work queue node to be blocked
1567 * @return operation status
1570 * @post external block count incremented
1573 afs_wq_node_block(struct afs_work_queue_node
* node
)
1578 opr_mutex_enter(&node
->lock
);
1579 ret
= _afs_wq_node_state_wait_busy(node
);
1584 start
= node
->block_count
++;
1587 (node
->qidx
== AFS_WQ_NODE_LIST_READY
)) {
1588 /* unblocked->blocked transition, and we're already scheduled */
1589 ret
= _afs_wq_node_list_remove(node
,
1590 AFS_WQ_NODE_STATE_BUSY
);
1595 ret
= _afs_wq_node_list_enqueue(&node
->queue
->blocked_list
,
1597 AFS_WQ_NODE_STATE_BLOCKED
);
1601 opr_mutex_exit(&node
->lock
);
1607 * unblock a work node for execution.
1609 * this can be used to allow external events to influence work queue flow.
1611 * @param[in] node work queue node to be blocked
1613 * @return operation status
1616 * @post external block count decremented
1619 afs_wq_node_unblock(struct afs_work_queue_node
* node
)
1624 opr_mutex_enter(&node
->lock
);
1625 ret
= _afs_wq_node_state_wait_busy(node
);
1630 end
= --node
->block_count
;
1633 (node
->qidx
== AFS_WQ_NODE_LIST_BLOCKED
)) {
1634 /* blocked->unblock transition, and we're ready to be scheduled */
1635 ret
= _afs_wq_node_list_remove(node
,
1636 AFS_WQ_NODE_STATE_BUSY
);
1641 ret
= _afs_wq_node_list_enqueue(&node
->queue
->ready_list
,
1643 AFS_WQ_NODE_STATE_SCHEDULED
);
1647 opr_mutex_exit(&node
->lock
);
1653 * initialize a afs_wq_add_opts struct with the default options.
1655 * @param[out] opts options structure to initialize
1658 afs_wq_add_opts_init(struct afs_work_queue_add_opts
*opts
)
1666 * schedule a work node for execution.
1668 * @param[in] queue work queue
1669 * @param[in] node work node
1670 * @param[in] opts options for adding, or NULL for defaults
1672 * @return operation status
1674 * @retval EWOULDBLOCK queue is full and opts specified not to block
1675 * @retval EINTR queue was full, we blocked to add, and the queue was
1676 * shutdown while we were blocking
1679 afs_wq_add(struct afs_work_queue
*queue
,
1680 struct afs_work_queue_node
*node
,
1681 struct afs_work_queue_add_opts
*opts
)
1684 int donate
, block
, force
, hithresh
;
1685 struct afs_work_queue_node_list
* list
;
1686 struct afs_work_queue_add_opts l_opts
;
1687 int waited_for_drain
= 0;
1688 afs_wq_work_state_t state
;
1691 afs_wq_add_opts_init(&l_opts
);
1695 donate
= opts
->donate
;
1696 block
= opts
->block
;
1697 force
= opts
->force
;
1700 opr_mutex_enter(&node
->lock
);
1702 ret
= _afs_wq_node_state_wait_busy(node
);
1707 if (!node
->block_count
&& !node
->error_count
) {
1708 list
= &queue
->ready_list
;
1709 state
= AFS_WQ_NODE_STATE_SCHEDULED
;
1710 } else if (node
->error_count
) {
1711 list
= &queue
->done_list
;
1712 state
= AFS_WQ_NODE_STATE_ERROR
;
1714 list
= &queue
->blocked_list
;
1715 state
= AFS_WQ_NODE_STATE_BLOCKED
;
1720 opr_mutex_enter(&queue
->lock
);
1722 if (queue
->shutdown
) {
1724 opr_mutex_exit(&queue
->lock
);
1725 opr_mutex_exit(&node
->lock
);
1729 hithresh
= queue
->opts
.pend_hithresh
;
1730 if (hithresh
> 0 && queue
->pend_count
>= hithresh
) {
1734 if (!force
&& (state
== AFS_WQ_NODE_STATE_SCHEDULED
1735 || state
== AFS_WQ_NODE_STATE_BLOCKED
)) {
1739 opr_mutex_exit(&node
->lock
);
1740 opr_cv_wait(&queue
->pend_cv
, &queue
->lock
);
1742 if (queue
->shutdown
) {
1745 opr_mutex_exit(&queue
->lock
);
1747 waited_for_drain
= 1;
1758 queue
->pend_count
++;
1760 if (waited_for_drain
) {
1761 /* signal another thread that may have been waiting for drain */
1762 opr_cv_signal(&queue
->pend_cv
);
1765 opr_mutex_exit(&queue
->lock
);
1773 node
->queue
= queue
;
1775 ret
= _afs_wq_node_list_enqueue(list
,
1783 * de-schedule a work node.
1785 * @param[in] node work node
1787 * @return operation status
1791 afs_wq_del(struct afs_work_queue_node
* node
)
1798 * execute a node on the queue.
1800 * @param[in] queue work queue
1801 * @param[in] rock opaque pointer (passed as third arg to callback func)
1803 * @return operation status
1804 * @retval 0 completed a work unit
1807 afs_wq_do(struct afs_work_queue
* queue
,
1810 return _afs_wq_do(queue
, rock
, 1);
1814 * execute a node on the queue, if there is any work to do.
1816 * @param[in] queue work queue
1817 * @param[in] rock opaque pointer (passed as third arg to callback func)
1819 * @return operation status
1820 * @retval 0 completed a work unit
1821 * @retval EWOULDBLOCK there was nothing to do
1824 afs_wq_do_nowait(struct afs_work_queue
* queue
,
1827 return _afs_wq_do(queue
, rock
, 0);
1831 * wait for all pending nodes to finish.
1833 * @param[in] queue work queue
1835 * @return operation status
1838 * @post the specified queue was empty at some point; it may not be empty by
1839 * the time this function returns, but at some point after the function was
1840 * called, there were no nodes in the ready queue or blocked queue.
1843 afs_wq_wait_all(struct afs_work_queue
*queue
)
1847 opr_mutex_enter(&queue
->lock
);
1849 while (queue
->pend_count
> 0 && !queue
->shutdown
) {
1850 opr_cv_wait(&queue
->empty_cv
, &queue
->lock
);
1853 if (queue
->shutdown
) {
1854 /* queue has been shut down, but there may still be some threads
1855 * running e.g. in the middle of their callback. ensure they have
1856 * stopped before we return. */
1857 while (queue
->running_count
> 0) {
1858 opr_cv_wait(&queue
->running_cv
, &queue
->lock
);
1865 opr_mutex_exit(&queue
->lock
);
1867 /* technically this doesn't really guarantee that the work queue is empty
1868 * after we return, but we do guarantee that it was empty at some point */
1874 * wait for a node to complete; dequeue from done list.
1876 * @param[in] node work queue node
1877 * @param[out] retcode return code from work unit
1879 * @return operation status
1882 * @pre ref held on node
1885 afs_wq_node_wait(struct afs_work_queue_node
* node
,
1890 opr_mutex_enter(&node
->lock
);
1891 if (node
->state
== AFS_WQ_NODE_STATE_INIT
) {
1892 /* not sure what to do in this case */
1896 while ((node
->state
!= AFS_WQ_NODE_STATE_DONE
) &&
1897 (node
->state
!= AFS_WQ_NODE_STATE_ERROR
)) {
1898 opr_cv_wait(&node
->state_cv
, &node
->lock
);
1901 *retcode
= node
->retcode
;
1904 if (node
->queue
== NULL
) {
1905 /* nothing we can do */
1909 ret
= _afs_wq_node_list_remove(node
,
1910 AFS_WQ_NODE_STATE_INIT
);
1913 opr_mutex_exit(&node
->lock
);