Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / bucoord / bc_status.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/stds.h>
12
13 #include <roken.h>
14
15 #include <afs/com_err.h>
16 #include <afs/bubasics.h>
17 #include <lock.h>
18 #include <afs/tcdata.h>
19 #include <afs/cmd.h>
20
21 #include "bc.h"
22 #include "error_macros.h"
23 #include "bucoord_internal.h"
24 #include "bucoord_prototypes.h"
25
26 #define SET_FLAG(set) \
27 lock_Status(); \
28 curPollPtr->flags |= (set); \
29 unlock_Status();
30
31 #define CLEAR_FLAG(clear) \
32 lock_Status(); \
33 curPollPtr->flags &= ~(clear); \
34 unlock_Status();
35
36 extern struct bc_config *bc_globalConfig;
37 extern afs_int32 bc_GetConn(struct bc_config *aconfig, afs_int32 aport, struct rx_connection **tconn);
38
39 /* globals for backup coordinator status management */
40
41 dlqlinkT statusHead; /* chain of status blocks */
42 struct Lock statusQueueLock; /* access control for status chain */
43 struct Lock cmdLineLock; /* lock on the cmdLine */
44
45 afs_int32 lastTaskCode; /* Error code from task that last finished */
46
47 /* nextItem
48 * get next item for status interrogation, if any.
49 */
50 static statusP
51 nextItem(statusP linkPtr)
52 {
53 dlqlinkP ptr;
54
55 ptr = (dlqlinkP) linkPtr;
56
57 /* if last known item has terminated, reset ptr */
58 if (ptr == 0) {
59 ptr = &statusHead;
60 if (dlqEmpty(ptr))
61 return (0);
62 }
63
64 ptr = ptr->dlq_next;
65
66 /* if we're back at the head again */
67 if (ptr == &statusHead)
68 return (0);
69 return ((statusP) ptr);
70 }
71
72 #ifdef notdef
73 static statusP
74 nextItem(linkPtr)
75 statusP linkPtr;
76 {
77 dlqlinkP ptr;
78
79 ptr = (dlqlinkP) linkPtr;
80
81 /* if last known item has terminated, reset ptr */
82 if (ptr == 0) {
83 ptr = &statusHead;
84 if (dlqEmpty(ptr))
85 return (0);
86 }
87
88 ptr = ptr->dlq_next;
89
90 /* if we're back at the head again */
91 if (ptr == &statusHead) {
92 ptr = ptr->dlq_next;
93 }
94 return ((statusP) ptr);
95 }
96 #endif /* notdef */
97
98 char *cmdLine;
99
100 void *
101 cmdDispatch(void *unused)
102 {
103 #define MAXV 100
104 char *targv[MAXV]; /*Ptr to parsed argv stuff */
105 afs_int32 targc; /*Num parsed arguments */
106 afs_int32 code;
107 char *internalCmdLine;
108
109 internalCmdLine = cmdLine;
110 unlock_cmdLine();
111
112 code = cmd_ParseLine(internalCmdLine, targv, &targc, MAXV);
113 if (code) {
114 printf("Couldn't parse line: '%s'", afs_error_message(code));
115 return (void *)(1);
116 }
117 free(internalCmdLine);
118
119 /*
120 * Because the "-at" option cannot be wildcarded, we cannot fall
121 * into recusive loop here by setting dispatchCount to 1.
122 */
123 doDispatch(targc, targv, 1);
124 cmd_FreeArgv(targv);
125 return(void *)(0);
126 }
127
128 void *
129 statusWatcher(void *unused)
130 {
131 struct rx_connection *tconn = NULL;
132 statusP curPollPtr = 0;
133
134 struct tciStatusS statusPtr;
135
136 /* task information */
137 afs_uint32 taskFlags;
138 afs_uint32 localTaskFlags;
139 afs_uint32 temp; /* for flag manipulation */
140 afs_int32 jobNumber;
141 afs_int32 taskId;
142 afs_int32 port;
143 afs_int32 atTime;
144 PROCESS dispatchPid;
145
146 afs_int32 code = 0;
147
148 lastTaskCode = 0;
149
150 while (1) { /*w */
151 if (tconn)
152 rx_DestroyConnection(tconn);
153 tconn = NULL;
154
155 lock_Status();
156 curPollPtr = nextItem(curPollPtr);
157
158 if (curPollPtr == 0) {
159 #ifdef AFS_PTHREAD_ENV
160 struct timespec delaytime;
161 unlock_Status();
162 delayTime.tv_sec = 5;
163 delayTime.tv_nsec = 0;
164 pthread_delay_np(&delayTime);
165 #else
166 unlock_Status();
167 IOMGR_Sleep(5); /* wait a while */
168 #endif /*else AFS_PTHREAD_ENV */
169 continue;
170 }
171
172 /* save useful information */
173 localTaskFlags = curPollPtr->flags;
174 taskId = curPollPtr->taskId;
175 port = curPollPtr->port;
176 atTime = curPollPtr->scheduledDump;
177 jobNumber = curPollPtr->jobNumber;
178 unlock_Status();
179
180 /* reset certain flags; local kill; */
181 CLEAR_FLAG(ABORT_LOCAL);
182
183 /* An abort request before the command even started */
184 if (atTime && (localTaskFlags & ABORT_REQUEST)) {
185 if (localTaskFlags & NOREMOVE) {
186 curPollPtr->flags |= (STARTING | ABORT_DONE); /* Will ignore on other passes */
187 curPollPtr->scheduledDump = 0;
188 } else {
189 deleteStatusNode(curPollPtr);
190 }
191 curPollPtr = 0;
192 continue;
193 }
194
195 /* A task not started yet - check its start time */
196 if (localTaskFlags & STARTING || atTime) {
197 /*
198 * Start a timed dump if its time has come. When the job is
199 * started, it will allocate its own status structure so this
200 * one is no longer needed: delete it.
201 *
202 * Avoid multiple processes trouncing the cmdLine by placing
203 * lock around it.
204 */
205 if (atTime && (atTime <= time(0))) {
206 lock_cmdLine(); /* Will unlock in cmdDispatch */
207
208 cmdLine = curPollPtr->cmdLine;
209 lock_Status();
210 curPollPtr->cmdLine = 0;
211 unlock_Status();
212
213 printf("Starting scheduled dump: job %d\n", jobNumber);
214 printf("schedD> %s\n", cmdLine);
215
216 code =
217 LWP_CreateProcess(cmdDispatch, 16384, LWP_NORMAL_PRIORITY,
218 (void *)2, "cmdDispatch", &dispatchPid);
219 if (code) {
220 if (cmdLine)
221 free(cmdLine);
222 unlock_cmdLine();
223 printf("Couldn't create cmdDispatch task\n");
224 }
225
226 if (localTaskFlags & NOREMOVE) {
227 curPollPtr->flags |= STARTING; /* Will ignore on other passes */
228 curPollPtr->flags |= (code ? TASK_ERROR : TASK_DONE);
229 curPollPtr->scheduledDump = 0;
230 } else {
231 deleteStatusNode(curPollPtr);
232 }
233 curPollPtr = 0;
234 }
235 continue;
236 }
237
238 if (localTaskFlags & ABORT_LOCAL) {
239 /* kill the local task */
240 if ((localTaskFlags & CONTACT_LOST) != 0) {
241 printf("Job %d: in contact with butc at port %d\n", jobNumber,
242 port);
243 printf("Job %d cont: Local kill ignored - use normal kill\n",
244 jobNumber);
245 }
246 }
247
248 code = (afs_int32) bc_GetConn(bc_globalConfig, port, &tconn);
249 if (code) {
250 SET_FLAG(CONTACT_LOST);
251 continue;
252 }
253
254 if (CheckTCVersion(tconn)) {
255 SET_FLAG(CONTACT_LOST);
256 continue;
257 }
258
259 /* Send abort to TC requst if we have to */
260 if (localTaskFlags & ABORT_REQUEST) {
261 code = TC_RequestAbort(tconn, taskId);
262 if (code) {
263 afs_com_err("statusWatcher", code, "; Can't post abort request");
264 afs_com_err("statusWatcher", 0, "...Deleting job");
265 if (localTaskFlags & NOREMOVE) {
266 curPollPtr->flags |= (STARTING | TASK_ERROR);
267 curPollPtr->scheduledDump = 0;
268 } else {
269 deleteStatusNode(curPollPtr);
270 }
271 curPollPtr = 0;
272 continue;
273 } else {
274 lock_Status();
275 curPollPtr->flags &= ~ABORT_REQUEST;
276 curPollPtr->flags |= ABORT_SENT;
277 unlock_Status();
278 }
279 }
280
281 /* otherwise just get the status */
282 code = TC_GetStatus(tconn, taskId, &statusPtr);
283 if (code) {
284 if (code == TC_NODENOTFOUND) {
285 printf("Job %d: %s - no such task on port %d, deleting\n",
286 jobNumber, curPollPtr->taskName, port);
287
288 if (localTaskFlags & NOREMOVE) {
289 curPollPtr->flags |= (STARTING | TASK_ERROR);
290 curPollPtr->scheduledDump = 0;
291 } else {
292 deleteStatusNode(curPollPtr); /* delete this status node */
293 }
294 curPollPtr = 0;
295 continue;
296 }
297
298 SET_FLAG(CONTACT_LOST);
299 continue;
300 }
301
302 /* in case we previously lost contact or couldn't find */
303 CLEAR_FLAG(CONTACT_LOST);
304
305 /* extract useful status */
306 taskFlags = statusPtr.flags;
307
308 /* update local status */
309 lock_Status();
310
311 /* remember some status flags in local struct */
312 temp =
313 (DRIVE_WAIT | OPR_WAIT | CALL_WAIT | TASK_DONE | ABORT_DONE |
314 TASK_ERROR);
315 curPollPtr->flags &= ~temp; /* clear */
316 curPollPtr->flags |= (taskFlags & temp); /* update */
317
318 curPollPtr->dbDumpId = statusPtr.dbDumpId;
319 curPollPtr->nKBytes = statusPtr.nKBytes;
320 strcpy(curPollPtr->volumeName, statusPtr.volumeName);
321 curPollPtr->volsFailed = statusPtr.volsFailed;
322 curPollPtr->lastPolled = statusPtr.lastPolled;
323 unlock_Status();
324
325 /* Are we done */
326 if (taskFlags & TASK_DONE) { /*done */
327 if (taskFlags & ABORT_DONE) {
328 if (curPollPtr->dbDumpId)
329 printf("Job %d: %s: DumpID %u Aborted", jobNumber,
330 curPollPtr->taskName, curPollPtr->dbDumpId);
331 else
332 printf("Job %d: %s Aborted", jobNumber,
333 curPollPtr->taskName);
334
335 if (taskFlags & TASK_ERROR)
336 printf(" with errors\n");
337 else
338 printf("\n");
339
340 lastTaskCode = 1;
341 }
342
343 else if (taskFlags & TASK_ERROR) {
344 if (!(localTaskFlags & SILENT)) {
345 if (curPollPtr->dbDumpId)
346 printf("Job %d: DumpID %u Failed with errors\n",
347 jobNumber, curPollPtr->dbDumpId);
348 else
349 printf("Job %d Failed with errors\n", jobNumber);
350 }
351 lastTaskCode = 2;
352 }
353
354 else {
355 if (!(localTaskFlags & SILENT)) {
356 if (curPollPtr->dbDumpId)
357 printf("Job %d: %s: DumpID %u finished", jobNumber,
358 curPollPtr->taskName, curPollPtr->dbDumpId);
359 else
360 printf("Job %d: %s finished", jobNumber,
361 curPollPtr->taskName);
362
363 if (curPollPtr->volsTotal) {
364 printf(". %d volumes dumped",
365 (curPollPtr->volsTotal -
366 curPollPtr->volsFailed));
367 if (curPollPtr->volsFailed)
368 printf(", %d failed", curPollPtr->volsFailed);
369 }
370
371 printf("\n");
372 }
373 lastTaskCode = 0;
374 }
375
376 /* make call to destroy task on server */
377 code = TC_EndStatus(tconn, taskId);
378 if (code)
379 printf("Job %d: %s, error in job termination cleanup\n",
380 jobNumber, curPollPtr->taskName);
381
382 if (localTaskFlags & NOREMOVE) {
383 curPollPtr->flags |= STARTING;
384 curPollPtr->scheduledDump = 0;
385 } else {
386 deleteStatusNode(curPollPtr); /* unlink and destroy local task */
387 }
388 curPollPtr = 0;
389 } /*done */
390 } /*w */
391 AFS_UNREACHED(return(NULL));
392 }
393
394 /* bc_jobNumber
395 * Allocate a job number. Computes the maximum of all the job numbers
396 * and then returns the maximum+1.
397 * If no jobs are found, returns 1.
398 */
399
400 afs_int32
401 bc_jobNumber(void)
402 {
403 afs_int32 retval = 0;
404 dlqlinkP ptr;
405
406 ptr = statusHead.dlq_next;
407 while (ptr != &statusHead) {
408 /* compute max of all job numbers */
409 if (((statusP) ptr)->jobNumber > retval)
410 retval = ((statusP) ptr)->jobNumber;
411
412 ptr = ptr->dlq_next;
413 }
414 retval++;
415 return (retval);
416 }
417
418 /* waitForTask
419 * Wait for a specific task to finish and then return.
420 * Return the task's flags when it's done. If the job
421 * had been cleaned up, then just return 0.
422 */
423 int
424 waitForTask(afs_uint32 taskId)
425 {
426 statusP ptr;
427 afs_int32 done = 0, rcode = 0, t;
428
429 t = (TASK_DONE | ABORT_DONE | TASK_ERROR);
430 while (!done) {
431 /* Sleep 2 seconds */
432 #ifdef AFS_PTHREAD_ENV
433 struct timespec delaytime;
434 delayTime.tv_sec = 2;
435 delayTime.tv_nsec = 0;
436 pthread_delay_np(&delayTime);
437 #else
438 IOMGR_Sleep(2);
439 #endif /*else AFS_PTHREAD_ENV */
440
441 /* Check if we are done */
442 lock_Status();
443 ptr = findStatus(taskId);
444 if (!ptr || (ptr->flags & t)) {
445 rcode = (ptr ? ptr->flags : 0);
446 done = 1;
447 }
448 unlock_Status();
449 }
450 return rcode;
451 }