2 * Copyright 2006-2008, Sine Nomine Associates and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
13 * OpenAFS demand attach fileserver
14 * Salvage server synchronization with fileserver.
17 /* This controls the size of an fd_set; it must be defined early before
18 * the system headers define that type and the macros that operate on it.
19 * Its value should be as large as the maximum file descriptor limit we
20 * are likely to run into on any platform. Right now, that is 65536
21 * which is the default hard fd limit on Solaris 9 */
23 #define FD_SETSIZE 65536
26 #include <afsconfig.h>
27 #include <afs/param.h>
29 #include <afs/procmgmt.h>
36 #include <afs/afsint.h>
37 #include <rx/rx_queue.h>
40 #include <afs/errors.h>
43 #include <afs/afssyscalls.h>
47 #include "partition.h"
49 #include <rx/rx_queue.h>
51 #ifdef USE_UNIX_SOCKETS
52 #include <afs/afsutil.h>
57 #define WCOREDUMP(x) ((x) & 0200)
60 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
61 * move = dump+restore can run on single server */
65 * This lock controls access to the handler array.
67 struct Lock SALVSYNC_handler_lock
;
70 #ifdef AFS_DEMAND_ATTACH_FS
72 * SALVSYNC is a feature specific to the demand attach fileserver
75 /* Forward declarations */
76 static void * SALVSYNC_syncThread(void *);
77 static void SALVSYNC_newconnection(osi_socket fd
);
78 static void SALVSYNC_com(osi_socket fd
);
79 static void SALVSYNC_Drop(osi_socket fd
);
80 static void AcceptOn(void);
81 static void AcceptOff(void);
82 static void InitHandler(void);
83 static void CallHandler(fd_set
* fdsetp
);
84 static int AddHandler(osi_socket afd
, void (*aproc
) (int));
85 static int FindHandler(osi_socket afd
);
86 static int FindHandler_r(osi_socket afd
);
87 static int RemoveHandler(osi_socket afd
);
88 static void GetHandler(fd_set
* fdsetp
, int *maxfdp
);
90 static int AllocNode(struct SalvageQueueNode
** node
);
92 static int AddToSalvageQueue(struct SalvageQueueNode
* node
);
93 static void DeleteFromSalvageQueue(struct SalvageQueueNode
* node
);
94 static void AddToPendingQueue(struct SalvageQueueNode
* node
);
95 static void DeleteFromPendingQueue(struct SalvageQueueNode
* node
);
96 static struct SalvageQueueNode
* LookupPendingCommandByPid(int pid
);
97 static void UpdateCommandPrio(struct SalvageQueueNode
* node
);
98 static void HandlePrio(struct SalvageQueueNode
* clone
,
99 struct SalvageQueueNode
* parent
,
100 afs_uint32 new_prio
);
102 static int LinkNode(struct SalvageQueueNode
* parent
,
103 struct SalvageQueueNode
* clone
);
105 static struct SalvageQueueNode
* LookupNode(VolumeId vid
, char * partName
,
106 struct SalvageQueueNode
** parent
);
107 static struct SalvageQueueNode
* LookupNodeByCommand(SALVSYNC_command_hdr
* qry
,
108 struct SalvageQueueNode
** parent
);
109 static void AddNodeToHash(struct SalvageQueueNode
* node
);
111 static afs_int32
SALVSYNC_com_Salvage(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
112 static afs_int32
SALVSYNC_com_Cancel(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
113 static afs_int32
SALVSYNC_com_Query(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
114 static afs_int32
SALVSYNC_com_CancelAll(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
115 static afs_int32
SALVSYNC_com_Link(SALVSYNC_command
* com
, SALVSYNC_response
* res
);
119 extern pthread_mutex_t vol_salvsync_mutex
;
122 * salvsync server socket handle.
124 static SYNC_server_state_t salvsync_server_state
=
125 { OSI_NULLSOCKET
, /* file descriptor */
126 SALVSYNC_ENDPOINT_DECL
, /* server endpoint */
127 SALVSYNC_PROTO_VERSION
, /* protocol version */
128 5, /* bind() retry limit */
129 100, /* listen() queue depth */
130 "SALVSYNC", /* protocol name string */
135 * queue of all volumes waiting to be salvaged.
137 struct SalvageQueue
{
138 volatile int total_len
;
139 volatile afs_int32 last_insert
; /**< id of last partition to have a salvage node inserted */
140 volatile int len
[VOLMAXPARTS
+1];
141 volatile struct rx_queue part
[VOLMAXPARTS
+1]; /**< per-partition queues of pending salvages */
144 static struct SalvageQueue salvageQueue
; /* volumes waiting to be salvaged */
147 * queue of all volumes currently being salvaged.
150 volatile struct rx_queue q
; /**< queue of salvages in progress */
151 volatile int len
; /**< length of in-progress queue */
152 pthread_cond_t queue_change_cv
;
154 static struct QueueHead pendingQueue
; /* volumes being salvaged */
157 * whether a partition has a salvage in progress
159 * the salvager code only permits one salvage per partition at a time
161 * the following hack tries to keep salvaged parallelism high by
162 * only permitting one salvage dispatch per partition at a time
164 * unfortunately, the parallel salvager currently
165 * has a rather braindead routine that won't permit
166 * multiple salvages on the same "device". this
167 * function happens to break pretty badly on lvm, raid luns, etc.
169 * this hack isn't good enough to stop the device limiting code from
170 * crippling performance. someday that code needs to be rewritten
172 static int partition_salvaging
[VOLMAXPARTS
+1];
174 static int HandlerFD
[MAXHANDLERS
];
175 static void (*HandlerProc
[MAXHANDLERS
]) (int);
177 #define VSHASH_SIZE 64
178 #define VSHASH_MASK (VSHASH_SIZE-1)
179 #define VSHASH(vid) ((vid)&VSHASH_MASK)
181 static struct QueueHead SalvageHashTable
[VSHASH_SIZE
];
183 static struct SalvageQueueNode
*
184 LookupNode(VolumeId vid
, char * partName
,
185 struct SalvageQueueNode
** parent
)
187 struct rx_queue
*qp
, *nqp
;
188 struct SalvageQueueNode
*vsp
= NULL
;
189 int idx
= VSHASH(vid
);
191 for (queue_Scan(&SalvageHashTable
[idx
], qp
, nqp
, rx_queue
)) {
192 vsp
= (struct SalvageQueueNode
*)((char *)qp
- offsetof(struct SalvageQueueNode
, hash_chain
));
193 if ((vsp
->command
.sop
.volume
== vid
) &&
194 !strncmp(vsp
->command
.sop
.partName
, partName
, sizeof(vsp
->command
.sop
.partName
))) {
199 if (queue_IsEnd(&SalvageHashTable
[idx
], qp
)) {
205 *parent
= (vsp
->type
== SALVSYNC_VOLGROUP_CLONE
) ?
206 vsp
->volgroup
.parent
: vsp
;
215 static struct SalvageQueueNode
*
216 LookupNodeByCommand(SALVSYNC_command_hdr
* qry
,
217 struct SalvageQueueNode
** parent
)
219 return LookupNode(qry
->volume
, qry
->partName
, parent
);
223 AddNodeToHash(struct SalvageQueueNode
* node
)
225 int idx
= VSHASH(node
->command
.sop
.volume
);
227 if (queue_IsOnQueue(&node
->hash_chain
)) {
231 queue_Append(&SalvageHashTable
[idx
], &node
->hash_chain
);
232 SalvageHashTable
[idx
].len
++;
237 DeleteNodeFromHash(struct SalvageQueueNode
* node
)
239 int idx
= VSHASH(node
->command
.sop
.volume
);
241 if (queue_IsNotOnQueue(&node
->hash_chain
)) {
245 queue_Remove(&node
->hash_chain
);
246 SalvageHashTable
[idx
].len
--;
251 SALVSYNC_salvInit(void)
255 pthread_attr_t tattr
;
257 /* initialize the queues */
258 Lock_Init(&SALVSYNC_handler_lock
);
259 CV_INIT(&salvageQueue
.cv
, "sq", CV_DEFAULT
, 0);
260 for (i
= 0; i
<= VOLMAXPARTS
; i
++) {
261 queue_Init(&salvageQueue
.part
[i
]);
262 salvageQueue
.len
[i
] = 0;
264 CV_INIT(&pendingQueue
.queue_change_cv
, "queuechange", CV_DEFAULT
, 0);
265 queue_Init(&pendingQueue
);
266 salvageQueue
.total_len
= pendingQueue
.len
= 0;
267 salvageQueue
.last_insert
= -1;
268 memset(partition_salvaging
, 0, sizeof(partition_salvaging
));
270 for (i
= 0; i
< VSHASH_SIZE
; i
++) {
271 CV_INIT(&SalvageHashTable
[i
].queue_change_cv
, "queuechange", CV_DEFAULT
, 0);
272 SalvageHashTable
[i
].len
= 0;
273 queue_Init(&SalvageHashTable
[i
]);
276 /* start the salvsync thread */
277 opr_Verify(pthread_attr_init(&tattr
) == 0);
278 opr_Verify(pthread_attr_setdetachstate(&tattr
,
279 PTHREAD_CREATE_DETACHED
) == 0);
280 opr_Verify(pthread_create(&tid
, &tattr
, SALVSYNC_syncThread
, NULL
) == 0);
287 for (i
= 0; i
< MAXHANDLERS
; ++i
) {
288 if (HandlerFD
[i
] >= 0) {
289 SALVSYNC_Drop(HandlerFD
[i
]);
293 /* just in case we were in AcceptOff mode, and thus this fd wouldn't
295 close(salvsync_server_state
.fd
);
296 salvsync_server_state
.fd
= OSI_NULLSOCKET
;
299 static fd_set SALVSYNC_readfds
;
302 SALVSYNC_syncThread(void * args
)
305 SYNC_server_state_t
* state
= &salvsync_server_state
;
307 /* when we fork, the child needs to close the salvsync server sockets,
308 * otherwise, it may get salvsync requests, instead of the parent
310 opr_Verify(pthread_atfork(NULL
, NULL
, CleanFDs
) == 0);
312 SYNC_getAddr(&state
->endpoint
, &state
->addr
);
313 SYNC_cleanupSock(state
);
316 (void)signal(SIGPIPE
, SIG_IGN
);
319 state
->fd
= SYNC_getSock(&state
->endpoint
);
320 code
= SYNC_bindSock(state
);
328 struct timeval s_timeout
;
329 GetHandler(&SALVSYNC_readfds
, &maxfd
);
330 s_timeout
.tv_sec
= SYNC_SELECT_TIMEOUT
;
331 s_timeout
.tv_usec
= 0;
332 /* Note: check for >= 1 below is essential since IOMGR_select
333 * doesn't have exactly same semantics as select.
335 if (select(maxfd
+ 1, &SALVSYNC_readfds
, NULL
, NULL
, &s_timeout
) >= 1)
336 CallHandler(&SALVSYNC_readfds
);
339 AFS_UNREACHED(return(NULL
));
343 SALVSYNC_newconnection(int afd
)
345 #ifdef USE_UNIX_SOCKETS
346 struct sockaddr_un other
;
347 #else /* USE_UNIX_SOCKETS */
348 struct sockaddr_in other
;
353 junk
= sizeof(other
);
354 fd
= accept(afd
, (struct sockaddr
*)&other
, &junk
);
355 if (fd
== OSI_NULLSOCKET
) {
356 osi_Panic("SALVSYNC_newconnection: accept failed, errno==%d\n", errno
);
357 } else if (!AddHandler(fd
, SALVSYNC_com
)) {
359 opr_Verify(AddHandler(fd
, SALVSYNC_com
));
363 /* this function processes commands from an salvsync file descriptor (fd) */
364 static afs_int32 SALV_cnt
= 0;
366 SALVSYNC_com(osi_socket fd
)
370 SALVSYNC_response_hdr sres_hdr
;
371 SALVSYNC_command scom
;
372 SALVSYNC_response sres
;
373 SYNC_PROTO_BUF_DECL(buf
);
375 memset(&com
, 0, sizeof(com
));
376 memset(&res
, 0, sizeof(res
));
377 memset(&scom
, 0, sizeof(scom
));
378 memset(&sres
, 0, sizeof(sres
));
379 memset(&sres_hdr
, 0, sizeof(sres_hdr
));
381 com
.payload
.buf
= (void *)buf
;
382 com
.payload
.len
= SYNC_PROTO_MAX_LEN
;
383 res
.payload
.buf
= (void *) &sres_hdr
;
384 res
.payload
.len
= sizeof(sres_hdr
);
385 res
.hdr
.response_len
= sizeof(res
.hdr
) + sizeof(sres_hdr
);
386 res
.hdr
.proto_version
= SALVSYNC_PROTO_VERSION
;
389 scom
.sop
= (SALVSYNC_command_hdr
*) buf
;
392 sres
.sop
= &sres_hdr
;
396 if (SYNC_getCom(&salvsync_server_state
, fd
, &com
)) {
397 Log("SALVSYNC_com: read failed; dropping connection (cnt=%d)\n", SALV_cnt
);
402 if (com
.recv_len
< sizeof(com
.hdr
)) {
403 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com
.recv_len
);
404 res
.hdr
.response
= SYNC_COM_ERROR
;
405 res
.hdr
.reason
= SYNC_REASON_MALFORMED_PACKET
;
406 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
410 if (com
.hdr
.proto_version
!= SALVSYNC_PROTO_VERSION
) {
411 Log("SALVSYNC_com: invalid protocol version (%u)\n", com
.hdr
.proto_version
);
412 res
.hdr
.response
= SYNC_COM_ERROR
;
413 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
417 if (com
.hdr
.command
== SYNC_COM_CHANNEL_CLOSE
) {
418 res
.hdr
.response
= SYNC_OK
;
419 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
421 /* don't respond, just drop; senders of SYNC_COM_CHANNEL_CLOSE
422 * never wait for a response. */
426 if (com
.recv_len
!= (sizeof(com
.hdr
) + sizeof(SALVSYNC_command_hdr
))) {
427 Log("SALVSYNC_com: invalid protocol message length (%u)\n", com
.recv_len
);
428 res
.hdr
.response
= SYNC_COM_ERROR
;
429 res
.hdr
.reason
= SYNC_REASON_MALFORMED_PACKET
;
430 res
.hdr
.flags
|= SYNC_FLAG_CHANNEL_SHUTDOWN
;
434 res
.hdr
.com_seq
= com
.hdr
.com_seq
;
437 switch (com
.hdr
.command
) {
440 case SALVSYNC_SALVAGE
:
441 case SALVSYNC_RAISEPRIO
:
442 res
.hdr
.response
= SALVSYNC_com_Salvage(&scom
, &sres
);
444 case SALVSYNC_CANCEL
:
445 /* cancel a salvage */
446 res
.hdr
.response
= SALVSYNC_com_Cancel(&scom
, &sres
);
448 case SALVSYNC_CANCELALL
:
449 /* cancel all queued salvages */
450 res
.hdr
.response
= SALVSYNC_com_CancelAll(&scom
, &sres
);
453 /* query whether a volume is done salvaging */
454 res
.hdr
.response
= SALVSYNC_com_Query(&scom
, &sres
);
456 case SALVSYNC_OP_LINK
:
457 /* link a clone to its parent in the scheduler */
458 res
.hdr
.response
= SALVSYNC_com_Link(&scom
, &sres
);
461 res
.hdr
.response
= SYNC_BAD_COMMAND
;
465 sres_hdr
.sq_len
= salvageQueue
.total_len
;
466 sres_hdr
.pq_len
= pendingQueue
.len
;
470 SYNC_putRes(&salvsync_server_state
, fd
, &res
);
473 if (res
.hdr
.flags
& SYNC_FLAG_CHANNEL_SHUTDOWN
) {
479 * request that a volume be salvaged.
481 * @param[in] com inbound command object
482 * @param[out] res outbound response object
484 * @return operation status
485 * @retval SYNC_OK success
486 * @retval SYNC_DENIED failed to enqueue request
487 * @retval SYNC_FAILED malformed command packet
489 * @note this is a SALVSYNC protocol rpc handler
493 * @post the volume is enqueued in the to-be-salvaged queue.
494 * if the volume was already in the salvage queue, its
495 * priority (and thus its location in the queue) are
499 SALVSYNC_com_Salvage(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
501 afs_int32 code
= SYNC_OK
;
502 struct SalvageQueueNode
* node
, * clone
;
505 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
507 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
511 clone
= LookupNodeByCommand(com
->sop
, &node
);
514 if (AllocNode(&node
)) {
516 res
->hdr
->reason
= SYNC_REASON_NOMEM
;
523 HandlePrio(clone
, node
, com
->sop
->prio
);
525 switch (node
->state
) {
526 case SALVSYNC_STATE_QUEUED
:
527 UpdateCommandPrio(node
);
530 case SALVSYNC_STATE_ERROR
:
531 case SALVSYNC_STATE_DONE
:
532 case SALVSYNC_STATE_UNKNOWN
:
533 memcpy(&clone
->command
.com
, com
->hdr
, sizeof(SYNC_command_hdr
));
534 memcpy(&clone
->command
.sop
, com
->sop
, sizeof(SALVSYNC_command_hdr
));
537 * make sure volgroup parent partition path is kept coherent
539 * If we ever want to support non-COW clones on a machine holding
540 * the RW site, please note that this code does not work under the
541 * conditions where someone zaps a COW clone on partition X, and
542 * subsequently creates a full clone on partition Y -- we'd need
543 * an inverse to SALVSYNC_com_Link.
544 * -- tkeiser 11/28/2007
546 strcpy(node
->command
.sop
.partName
, com
->sop
->partName
);
548 if (AddToSalvageQueue(node
)) {
561 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
562 res
->sop
->state
= node
->state
;
563 res
->sop
->prio
= node
->command
.sop
.prio
;
570 * cancel a pending salvage request.
572 * @param[in] com inbound command object
573 * @param[out] res outbound response object
575 * @return operation status
576 * @retval SYNC_OK success
577 * @retval SYNC_FAILED malformed command packet
579 * @note this is a SALVSYNC protocol rpc handler
584 SALVSYNC_com_Cancel(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
586 afs_int32 code
= SYNC_OK
;
587 struct SalvageQueueNode
* node
;
589 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
591 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
595 node
= LookupNodeByCommand(com
->sop
, NULL
);
598 res
->sop
->state
= SALVSYNC_STATE_UNKNOWN
;
601 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
602 res
->sop
->prio
= node
->command
.sop
.prio
;
603 res
->sop
->state
= node
->state
;
604 if ((node
->type
== SALVSYNC_VOLGROUP_PARENT
) &&
605 (node
->state
== SALVSYNC_STATE_QUEUED
)) {
606 DeleteFromSalvageQueue(node
);
615 * cancel all pending salvage requests.
617 * @param[in] com incoming command object
618 * @param[out] res outbound response object
620 * @return operation status
621 * @retval SYNC_OK success
623 * @note this is a SALVSYNC protocol rpc handler
628 SALVSYNC_com_CancelAll(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
630 struct SalvageQueueNode
* np
, *nnp
;
631 struct DiskPartition64
* dp
;
633 for (dp
= DiskPartitionList
; dp
; dp
= dp
->next
) {
634 for (queue_Scan(&salvageQueue
.part
[dp
->index
], np
, nnp
, SalvageQueueNode
)) {
635 DeleteFromSalvageQueue(np
);
643 * link a queue node for a clone to its parent volume.
645 * @param[in] com inbound command object
646 * @param[out] res outbound response object
648 * @return operation status
649 * @retval SYNC_OK success
650 * @retval SYNC_FAILED malformed command packet
651 * @retval SYNC_DENIED the request could not be completed
653 * @note this is a SALVSYNC protocol rpc handler
655 * @post the requested volume is marked as a child of another volume.
656 * thus, future salvage requests for this volume will result in the
657 * parent of the volume group being scheduled for salvage instead
663 SALVSYNC_com_Link(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
665 afs_int32 code
= SYNC_OK
;
666 struct SalvageQueueNode
* clone
, * parent
;
668 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
670 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
674 /* lookup clone's salvage scheduling node */
675 clone
= LookupNodeByCommand(com
->sop
, NULL
);
678 res
->hdr
->reason
= SALVSYNC_REASON_ERROR
;
682 /* lookup parent's salvage scheduling node */
683 parent
= LookupNode(com
->sop
->parent
, com
->sop
->partName
, NULL
);
684 if (parent
== NULL
) {
685 if (AllocNode(&parent
)) {
687 res
->hdr
->reason
= SYNC_REASON_NOMEM
;
690 memcpy(&parent
->command
.com
, com
->hdr
, sizeof(SYNC_command_hdr
));
691 memcpy(&parent
->command
.sop
, com
->sop
, sizeof(SALVSYNC_command_hdr
));
692 parent
->command
.sop
.volume
= parent
->command
.sop
.parent
= com
->sop
->parent
;
693 AddNodeToHash(parent
);
696 if (LinkNode(parent
, clone
)) {
706 * query the status of a volume salvage request.
708 * @param[in] com inbound command object
709 * @param[out] res outbound response object
711 * @return operation status
712 * @retval SYNC_OK success
713 * @retval SYNC_FAILED malformed command packet
715 * @note this is a SALVSYNC protocol rpc handler
720 SALVSYNC_com_Query(SALVSYNC_command
* com
, SALVSYNC_response
* res
)
722 afs_int32 code
= SYNC_OK
;
723 struct SalvageQueueNode
* node
;
725 if (SYNC_verifyProtocolString(com
->sop
->partName
, sizeof(com
->sop
->partName
))) {
727 res
->hdr
->reason
= SYNC_REASON_MALFORMED_PACKET
;
731 LookupNodeByCommand(com
->sop
, &node
);
733 /* query whether a volume is done salvaging */
735 res
->sop
->state
= SALVSYNC_STATE_UNKNOWN
;
738 res
->hdr
->flags
|= SALVSYNC_FLAG_VOL_STATS_VALID
;
739 res
->sop
->state
= node
->state
;
740 res
->sop
->prio
= node
->command
.sop
.prio
;
748 SALVSYNC_Drop(osi_socket fd
)
755 static int AcceptHandler
= -1; /* handler id for accept, if turned on */
760 if (AcceptHandler
== -1) {
761 opr_Verify(AddHandler(salvsync_server_state
.fd
,
762 SALVSYNC_newconnection
));
763 AcceptHandler
= FindHandler(salvsync_server_state
.fd
);
770 if (AcceptHandler
!= -1) {
771 opr_Verify(RemoveHandler(salvsync_server_state
.fd
));
776 /* The multiple FD handling code. */
782 ObtainWriteLock(&SALVSYNC_handler_lock
);
783 for (i
= 0; i
< MAXHANDLERS
; i
++) {
784 HandlerFD
[i
] = OSI_NULLSOCKET
;
785 HandlerProc
[i
] = NULL
;
787 ReleaseWriteLock(&SALVSYNC_handler_lock
);
791 CallHandler(fd_set
* fdsetp
)
794 ObtainReadLock(&SALVSYNC_handler_lock
);
795 for (i
= 0; i
< MAXHANDLERS
; i
++) {
796 if (HandlerFD
[i
] >= 0 && FD_ISSET(HandlerFD
[i
], fdsetp
)) {
797 ReleaseReadLock(&SALVSYNC_handler_lock
);
798 (*HandlerProc
[i
]) (HandlerFD
[i
]);
799 ObtainReadLock(&SALVSYNC_handler_lock
);
802 ReleaseReadLock(&SALVSYNC_handler_lock
);
806 AddHandler(osi_socket afd
, void (*aproc
) (int))
809 ObtainWriteLock(&SALVSYNC_handler_lock
);
810 for (i
= 0; i
< MAXHANDLERS
; i
++)
811 if (HandlerFD
[i
] == OSI_NULLSOCKET
)
813 if (i
>= MAXHANDLERS
) {
814 ReleaseWriteLock(&SALVSYNC_handler_lock
);
818 HandlerProc
[i
] = aproc
;
819 ReleaseWriteLock(&SALVSYNC_handler_lock
);
824 FindHandler(osi_socket afd
)
827 ObtainReadLock(&SALVSYNC_handler_lock
);
828 for (i
= 0; i
< MAXHANDLERS
; i
++)
829 if (HandlerFD
[i
] == afd
) {
830 ReleaseReadLock(&SALVSYNC_handler_lock
);
833 ReleaseReadLock(&SALVSYNC_handler_lock
); /* just in case */
834 osi_Panic("Failed to find handler\n");
835 return -1; /* satisfy compiler */
839 FindHandler_r(osi_socket afd
)
842 for (i
= 0; i
< MAXHANDLERS
; i
++)
843 if (HandlerFD
[i
] == afd
) {
846 osi_Panic("Failed to find handler\n");
847 return -1; /* satisfy compiler */
851 RemoveHandler(osi_socket afd
)
853 ObtainWriteLock(&SALVSYNC_handler_lock
);
854 HandlerFD
[FindHandler_r(afd
)] = OSI_NULLSOCKET
;
855 ReleaseWriteLock(&SALVSYNC_handler_lock
);
860 GetHandler(fd_set
* fdsetp
, int *maxfdp
)
865 ObtainReadLock(&SALVSYNC_handler_lock
); /* just in case */
866 for (i
= 0; i
< MAXHANDLERS
; i
++)
867 if (HandlerFD
[i
] != OSI_NULLSOCKET
) {
868 FD_SET(HandlerFD
[i
], fdsetp
);
870 /* On Windows the nfds parameter to select() is ignored */
871 if (maxfd
< HandlerFD
[i
] || maxfd
== (int)-1)
872 maxfd
= HandlerFD
[i
];
876 ReleaseReadLock(&SALVSYNC_handler_lock
); /* just in case */
880 * allocate a salvage queue node.
882 * @param[out] node_out address in which to store new node pointer
884 * @return operation status
886 * @retval 1 failed to allocate node
891 AllocNode(struct SalvageQueueNode
** node_out
)
894 struct SalvageQueueNode
* node
;
896 *node_out
= node
= calloc(1, sizeof(struct SalvageQueueNode
));
902 node
->type
= SALVSYNC_VOLGROUP_PARENT
;
903 node
->state
= SALVSYNC_STATE_UNKNOWN
;
910 * link a salvage queue node to its parent.
912 * @param[in] parent pointer to queue node for parent of volume group
913 * @param[in] clone pointer to queue node for a clone
915 * @return operation status
922 LinkNode(struct SalvageQueueNode
* parent
,
923 struct SalvageQueueNode
* clone
)
928 /* check for attaching a clone to a clone */
929 if (parent
->type
!= SALVSYNC_VOLGROUP_PARENT
) {
934 /* check for pre-existing registration and openings */
935 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
936 if (parent
->volgroup
.children
[idx
] == clone
) {
939 if (parent
->volgroup
.children
[idx
] == NULL
) {
943 if (idx
== VOLMAXTYPES
) {
948 /* link parent and child */
949 parent
->volgroup
.children
[idx
] = clone
;
950 clone
->type
= SALVSYNC_VOLGROUP_CLONE
;
951 clone
->volgroup
.parent
= parent
;
955 switch (clone
->state
) {
956 case SALVSYNC_STATE_QUEUED
:
957 DeleteFromSalvageQueue(clone
);
959 case SALVSYNC_STATE_SALVAGING
:
960 switch (parent
->state
) {
961 case SALVSYNC_STATE_UNKNOWN
:
962 case SALVSYNC_STATE_ERROR
:
963 case SALVSYNC_STATE_DONE
:
964 parent
->command
.sop
.prio
= clone
->command
.sop
.prio
;
965 AddToSalvageQueue(parent
);
968 case SALVSYNC_STATE_QUEUED
:
969 if (clone
->command
.sop
.prio
) {
970 parent
->command
.sop
.prio
+= clone
->command
.sop
.prio
;
971 UpdateCommandPrio(parent
);
989 HandlePrio(struct SalvageQueueNode
* clone
,
990 struct SalvageQueueNode
* node
,
995 switch (node
->state
) {
996 case SALVSYNC_STATE_ERROR
:
997 case SALVSYNC_STATE_DONE
:
998 case SALVSYNC_STATE_UNKNOWN
:
999 node
->command
.sop
.prio
= 0;
1005 if (new_prio
< clone
->command
.sop
.prio
) {
1006 /* strange. let's just set our delta to 1 */
1009 delta
= new_prio
- clone
->command
.sop
.prio
;
1012 if (clone
->type
== SALVSYNC_VOLGROUP_CLONE
) {
1013 clone
->command
.sop
.prio
= new_prio
;
1016 node
->command
.sop
.prio
+= delta
;
1020 AddToSalvageQueue(struct SalvageQueueNode
* node
)
1023 struct SalvageQueueNode
* last
= NULL
;
1025 id
= volutil_GetPartitionID(node
->command
.sop
.partName
);
1026 if (id
< 0 || id
> VOLMAXPARTS
) {
1029 if (!VGetPartitionById_r(id
, 0)) {
1030 /* don't enqueue salvage requests for unmounted partitions */
1033 if (queue_IsOnQueue(node
)) {
1037 if (queue_IsNotEmpty(&salvageQueue
.part
[id
])) {
1038 last
= queue_Last(&salvageQueue
.part
[id
], SalvageQueueNode
);
1040 queue_Append(&salvageQueue
.part
[id
], node
);
1041 salvageQueue
.len
[id
]++;
1042 salvageQueue
.total_len
++;
1043 salvageQueue
.last_insert
= id
;
1044 node
->partition_id
= id
;
1045 node
->state
= SALVSYNC_STATE_QUEUED
;
1047 /* reorder, if necessary */
1048 if (last
&& last
->command
.sop
.prio
< node
->command
.sop
.prio
) {
1049 UpdateCommandPrio(node
);
1052 CV_BROADCAST(&salvageQueue
.cv
);
1057 DeleteFromSalvageQueue(struct SalvageQueueNode
* node
)
1059 if (queue_IsOnQueue(node
)) {
1061 salvageQueue
.len
[node
->partition_id
]--;
1062 salvageQueue
.total_len
--;
1063 node
->state
= SALVSYNC_STATE_UNKNOWN
;
1064 CV_BROADCAST(&salvageQueue
.cv
);
1069 AddToPendingQueue(struct SalvageQueueNode
* node
)
1071 queue_Append(&pendingQueue
, node
);
1073 node
->state
= SALVSYNC_STATE_SALVAGING
;
1074 CV_BROADCAST(&pendingQueue
.queue_change_cv
);
1078 DeleteFromPendingQueue(struct SalvageQueueNode
* node
)
1080 if (queue_IsOnQueue(node
)) {
1083 node
->state
= SALVSYNC_STATE_UNKNOWN
;
1084 CV_BROADCAST(&pendingQueue
.queue_change_cv
);
1089 static struct SalvageQueueNode
*
1090 LookupPendingCommand(SALVSYNC_command_hdr
* qry
)
1092 struct SalvageQueueNode
* np
, * nnp
;
1094 for (queue_Scan(&pendingQueue
, np
, nnp
, SalvageQueueNode
)) {
1095 if ((np
->command
.sop
.volume
== qry
->volume
) &&
1096 !strncmp(np
->command
.sop
.partName
, qry
->partName
,
1097 sizeof(qry
->partName
)))
1101 if (queue_IsEnd(&pendingQueue
, np
))
1107 static struct SalvageQueueNode
*
1108 LookupPendingCommandByPid(int pid
)
1110 struct SalvageQueueNode
* np
, * nnp
;
1112 for (queue_Scan(&pendingQueue
, np
, nnp
, SalvageQueueNode
)) {
1117 if (queue_IsEnd(&pendingQueue
, np
))
1123 /* raise the priority of a previously scheduled salvage */
1125 UpdateCommandPrio(struct SalvageQueueNode
* node
)
1127 struct SalvageQueueNode
*np
, *nnp
;
1131 opr_Assert(queue_IsOnQueue(node
));
1133 prio
= node
->command
.sop
.prio
;
1134 id
= node
->partition_id
;
1135 if (queue_First(&salvageQueue
.part
[id
], SalvageQueueNode
)->command
.sop
.prio
< prio
) {
1137 queue_Prepend(&salvageQueue
.part
[id
], node
);
1139 for (queue_ScanBackwardsFrom(&salvageQueue
.part
[id
], node
, np
, nnp
, SalvageQueueNode
)) {
1140 if (np
->command
.sop
.prio
> prio
)
1143 if (queue_IsEnd(&salvageQueue
.part
[id
], np
)) {
1145 queue_Prepend(&salvageQueue
.part
[id
], node
);
1146 } else if (node
!= np
) {
1148 queue_InsertAfter(np
, node
);
1153 /* this will need to be rearchitected if we ever want more than one thread
1154 * to wait for new salvage nodes */
1155 struct SalvageQueueNode
*
1156 SALVSYNC_getWork(void)
1159 struct DiskPartition64
* dp
= NULL
, * fdp
;
1160 static afs_int32 next_part_sched
= 0;
1161 struct SalvageQueueNode
*node
= NULL
;
1166 * wait for work to be scheduled
1167 * if there are no disk partitions, just sit in this wait loop forever
1169 while (!salvageQueue
.total_len
|| !DiskPartitionList
) {
1170 VOL_CV_WAIT(&salvageQueue
.cv
);
1174 * short circuit for simple case where only one partition has
1175 * scheduled salvages
1177 if (salvageQueue
.last_insert
>= 0 && salvageQueue
.last_insert
<= VOLMAXPARTS
&&
1178 (salvageQueue
.total_len
== salvageQueue
.len
[salvageQueue
.last_insert
])) {
1179 node
= queue_First(&salvageQueue
.part
[salvageQueue
.last_insert
], SalvageQueueNode
);
1185 * ok, more than one partition has scheduled salvages.
1186 * now search for partitions with scheduled salvages, but no pending salvages.
1188 dp
= VGetPartitionById_r(next_part_sched
, 0);
1190 dp
= DiskPartitionList
;
1196 dp
= (dp
->next
) ? dp
->next
: DiskPartitionList
, i
++ ) {
1197 if (!partition_salvaging
[dp
->index
] && salvageQueue
.len
[dp
->index
]) {
1198 node
= queue_First(&salvageQueue
.part
[dp
->index
], SalvageQueueNode
);
1205 * all partitions with scheduled salvages have at least one pending.
1206 * now do an exhaustive search for a scheduled salvage.
1212 dp
= (dp
->next
) ? dp
->next
: DiskPartitionList
, i
++ ) {
1213 if (salvageQueue
.len
[dp
->index
]) {
1214 node
= queue_First(&salvageQueue
.part
[dp
->index
], SalvageQueueNode
);
1219 /* we should never reach this line */
1220 osi_Panic("Node not found\n");
1223 opr_Assert(node
!= NULL
);
1225 partition_salvaging
[node
->partition_id
]++;
1226 DeleteFromSalvageQueue(node
);
1227 AddToPendingQueue(node
);
1230 /* update next_part_sched field */
1232 next_part_sched
= dp
->next
->index
;
1233 } else if (DiskPartitionList
) {
1234 next_part_sched
= DiskPartitionList
->index
;
1236 next_part_sched
= -1;
1245 * update internal scheduler state to reflect completion of a work unit.
1247 * @param[in] node salvage queue node object pointer
1248 * @param[in] result worker process result code
1250 * @post scheduler state is updated.
1255 SALVSYNC_doneWork_r(struct SalvageQueueNode
* node
, int result
)
1260 DeleteFromPendingQueue(node
);
1261 partid
= node
->partition_id
;
1262 if (partid
>=0 && partid
<= VOLMAXPARTS
) {
1263 partition_salvaging
[partid
]--;
1266 node
->state
= SALVSYNC_STATE_DONE
;
1267 } else if (result
!= SALSRV_EXIT_VOLGROUP_LINK
) {
1268 node
->state
= SALVSYNC_STATE_ERROR
;
1271 if (node
->type
== SALVSYNC_VOLGROUP_PARENT
) {
1272 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
1273 if (node
->volgroup
.children
[idx
]) {
1274 node
->volgroup
.children
[idx
]->state
= node
->state
;
1281 * check whether worker child failed.
1283 * @param[in] status status bitfield return by wait()
1285 * @return boolean failure code
1286 * @retval 0 child succeeded
1287 * @retval 1 child failed
1292 ChildFailed(int status
)
1294 return (WCOREDUMP(status
) ||
1295 WIFSIGNALED(status
) ||
1296 ((WEXITSTATUS(status
) != 0) &&
1297 (WEXITSTATUS(status
) != SALSRV_EXIT_VOLGROUP_LINK
)));
1302 * notify salvsync scheduler of node completion, by child pid.
1304 * @param[in] pid pid of worker child
1305 * @param[in] status worker status bitfield from wait()
1307 * @post scheduler state is updated.
1308 * if status code is a failure, fileserver notification was attempted
1310 * @see SALVSYNC_doneWork_r
1313 SALVSYNC_doneWorkByPid(int pid
, int status
)
1315 struct SalvageQueueNode
* node
;
1317 VolumeId volids
[VOLMAXTYPES
+1];
1320 memset(volids
, 0, sizeof(volids
));
1323 node
= LookupPendingCommandByPid(pid
);
1325 SALVSYNC_doneWork_r(node
, status
);
1327 if (ChildFailed(status
)) {
1328 /* populate volume id list for later processing outside the glock */
1329 volids
[0] = node
->command
.sop
.volume
;
1330 strcpy(partName
, node
->command
.sop
.partName
);
1331 if (node
->type
== SALVSYNC_VOLGROUP_PARENT
) {
1332 for (idx
= 0; idx
< VOLMAXTYPES
; idx
++) {
1333 if (node
->volgroup
.children
[idx
]) {
1334 volids
[idx
+1] = node
->volgroup
.children
[idx
]->command
.sop
.volume
;
1343 * if necessary, notify fileserver of
1344 * failure to salvage volume group
1345 * [we cannot guarantee that the child made the
1346 * appropriate notifications (e.g. SIGSEGV)]
1347 * -- tkeiser 11/28/2007
1349 if (ChildFailed(status
)) {
1350 for (idx
= 0; idx
<= VOLMAXTYPES
; idx
++) {
1352 FSYNC_VolOp(volids
[idx
],
1354 FSYNC_VOL_FORCE_ERROR
,
1362 #endif /* AFS_DEMAND_ATTACH_FS */