Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / ubik / recovery.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <afs/opr.h>
16
17 #ifdef AFS_PTHREAD_ENV
18 # include <opr/lock.h>
19 #else
20 # include <opr/lockstub.h>
21 #endif
22
23 #include <rx/rx.h>
24 #include <afs/afsutil.h>
25 #include <afs/cellconfig.h>
26
27
28 #define UBIK_INTERNALS
29 #include "ubik.h"
30 #include "ubik_int.h"
31
32 /*! \file
33 * This module is responsible for determining when the system has
34 * recovered to the point that it can handle new transactions. It
35 * replays logs, polls to determine the current dbase after a crash,
36 * and distributes the new database to the others.
37 *
38 * The sync site associates a version number with each database. It
39 * broadcasts the version associated with its current dbase in every
40 * one of its beacon messages. When the sync site send a dbase to a
41 * server, it also sends the db's version. A non-sync site server can
42 * tell if it has the right dbase version by simply comparing the
43 * version from the beacon message \p uvote_dbVersion with the version
44 * associated with the database \p ubik_dbase->version. The sync site
45 * itself simply has one counter to keep track of all of this (again
46 * \p ubik_dbase->version).
47 *
48 * sync site: routine called when the sync site loses its quorum; this
49 * procedure is called "up" from the beacon package. It resyncs the
50 * dbase and nudges the recovery daemon to try to propagate out the
51 * changes. It also resets the recovery daemon's state, since
52 * recovery must potentially find a new dbase to propagate out. This
53 * routine should not do anything with variables used by non-sync site
54 * servers.
55 */
56
57 /*!
58 * if this flag is set, then ubik will use only the primary address
59 * (the address specified in the CellServDB) to contact other
60 * ubik servers. Ubik recovery will not try opening connections
61 * to the alternate interface addresses.
62 */
63 int ubikPrimaryAddrOnly;
64
65 int
66 urecovery_ResetState(void)
67 {
68 urecovery_state = 0;
69 return 0;
70 }
71
72 /*!
73 * \brief sync site
74 *
75 * routine called when a non-sync site server goes down; restarts recovery
76 * process to send missing server the new db when it comes back up for
77 * non-sync site servers.
78 *
79 * \note This routine should not do anything with variables used by non-sync site servers.
80 */
81 int
82 urecovery_LostServer(struct ubik_server *ts)
83 {
84 ubeacon_ReinitServer(ts);
85 return 0;
86 }
87
88 /*!
89 * return true iff we have a current database (called by both sync
90 * sites and non-sync sites) How do we determine this? If we're the
91 * sync site, we wait until recovery has finished fetching and
92 * re-labelling its dbase (it may still be trying to propagate it out
93 * to everyone else; that's THEIR problem). If we're not the sync
94 * site, then we must have a dbase labelled with the right version,
95 * and we must have a currently-good sync site.
96 */
97 int
98 urecovery_AllBetter(struct ubik_dbase *adbase, int areadAny)
99 {
100 afs_int32 rcode;
101
102 ViceLog(25, ("allbetter checking\n"));
103 rcode = 0;
104
105
106 if (areadAny) {
107 if (ubik_dbase->version.epoch > 1)
108 rcode = 1; /* Happy with any good version of database */
109 }
110
111 /* Check if we're sync site and we've got the right data */
112 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
113 rcode = 1;
114 }
115
116 /* next, check if we're aux site, and we've ever been sent the
117 * right data (note that if a dbase update fails, we won't think
118 * that the sync site is still the sync site, 'cause it won't talk
119 * to us until a timeout period has gone by. When we recover, we
120 * leave this clear until we get a new dbase */
121 else if (uvote_HaveSyncAndVersion(ubik_dbase->version)) {
122 rcode = 1;
123 }
124
125 ViceLog(25, ("allbetter: returning %d\n", rcode));
126 return rcode;
127 }
128
129 /*!
130 * \brief abort all transactions on this database
131 */
132 int
133 urecovery_AbortAll(struct ubik_dbase *adbase)
134 {
135 struct ubik_trans *tt;
136 for (tt = adbase->activeTrans; tt; tt = tt->next) {
137 udisk_abort(tt);
138 }
139 return 0;
140 }
141
142 /*!
143 * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
144 */
145 int
146 urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
147 {
148 if (ubik_currentTrans) {
149 /* there is remote write trans, see if we match, see if this
150 * is a new transaction */
151 if (atid->epoch != ubik_currentTrans->tid.epoch
152 || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
153 /* don't match, abort it */
154 /* If the thread is not waiting for lock - ok to end it */
155 if (ubik_currentTrans->locktype != LOCKWAIT) {
156 udisk_end(ubik_currentTrans);
157 }
158 ubik_currentTrans = (struct ubik_trans *)0;
159 }
160 }
161 return 0;
162 }
163
164 /*!
165 * \brief replay logs
166 *
167 * log format is defined here, and implicitly in disk.c
168 *
169 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
170 * are in logged in network standard byte order, in case we want to move logs
171 * from machine-to-machine someday.
172 *
173 * Begin transaction: opcode \n
174 * Commit transaction: opcode, version (8 bytes) \n
175 * Truncate file: opcode, file number, length \n
176 * Abort transaction: opcode \n
177 * Write data: opcode, file, position, length, <length> data bytes \n
178 *
179 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
180 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
181 * the commit record, then we allow data to be written through to the dbase. In our particular
182 * implementation, once a transaction is done, we write out the pages to the database, so that
183 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
184 * any changed data while there is an uncommitted write transaction can be zapped during an
185 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
186 * the log.
187 */
188 static int
189 ReplayLog(struct ubik_dbase *adbase)
190 {
191 afs_int32 opcode;
192 afs_int32 code, tpos;
193 int logIsGood;
194 afs_int32 len, thisSize, tfile, filePos;
195 afs_int32 buffer[4];
196 afs_int32 syncFile = -1;
197 afs_int32 data[1024];
198
199 /* read the lock twice, once to see whether we have a transaction to deal
200 * with that committed, (theoretically, we should support more than one
201 * trans in the log at once, but not yet), and once replaying the
202 * transactions. */
203 tpos = 0;
204 logIsGood = 0;
205 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
206 while (1) {
207 code =
208 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
209 sizeof(afs_int32));
210 if (code != sizeof(afs_int32))
211 break;
212 opcode = ntohl(opcode);
213 if (opcode == LOGNEW) {
214 /* handle begin trans */
215 tpos += sizeof(afs_int32);
216 } else if (opcode == LOGABORT)
217 break;
218 else if (opcode == LOGEND) {
219 logIsGood = 1;
220 break;
221 } else if (opcode == LOGTRUNCATE) {
222 tpos += 4;
223 code =
224 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
225 2 * sizeof(afs_int32));
226 if (code != 2 * sizeof(afs_int32))
227 break; /* premature eof or io error */
228 tpos += 2 * sizeof(afs_int32);
229 } else if (opcode == LOGDATA) {
230 tpos += 4;
231 code =
232 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
233 3 * sizeof(afs_int32));
234 if (code != 3 * sizeof(afs_int32))
235 break;
236 /* otherwise, skip over the data bytes, too */
237 tpos += ntohl(buffer[2]) + 3 * sizeof(afs_int32);
238 } else {
239 ViceLog(0, ("corrupt log opcode (%d) at position %d\n", opcode,
240 tpos));
241 break; /* corrupt log! */
242 }
243 }
244 if (logIsGood) {
245 /* actually do the replay; log should go all the way through the commit record, since
246 * we just read it above. */
247 tpos = 0;
248 logIsGood = 0;
249 syncFile = -1;
250 while (1) {
251 code =
252 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
253 sizeof(afs_int32));
254 if (code != sizeof(afs_int32))
255 break;
256 opcode = ntohl(opcode);
257 if (opcode == LOGNEW) {
258 /* handle begin trans */
259 tpos += sizeof(afs_int32);
260 } else if (opcode == LOGABORT)
261 panic("log abort\n");
262 else if (opcode == LOGEND) {
263 struct ubik_version version;
264 tpos += 4;
265 code =
266 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
267 2 * sizeof(afs_int32));
268 if (code != 2 * sizeof(afs_int32))
269 return UBADLOG;
270 version.epoch = ntohl(buffer[0]);
271 version.counter = ntohl(buffer[1]);
272 code = (*adbase->setlabel) (adbase, 0, &version);
273 if (code)
274 return code;
275 ViceLog(0, ("Successfully replayed log for interrupted "
276 "transaction; db version is now %ld.%ld\n",
277 (long) version.epoch, (long) version.counter));
278 logIsGood = 1;
279 break; /* all done now */
280 } else if (opcode == LOGTRUNCATE) {
281 tpos += 4;
282 code =
283 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
284 2 * sizeof(afs_int32));
285 if (code != 2 * sizeof(afs_int32))
286 break; /* premature eof or io error */
287 tpos += 2 * sizeof(afs_int32);
288 code =
289 (*adbase->truncate) (adbase, ntohl(buffer[0]),
290 ntohl(buffer[1]));
291 if (code)
292 return code;
293 } else if (opcode == LOGDATA) {
294 tpos += 4;
295 code =
296 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
297 3 * sizeof(afs_int32));
298 if (code != 3 * sizeof(afs_int32))
299 break;
300 tpos += 3 * sizeof(afs_int32);
301 /* otherwise, skip over the data bytes, too */
302 len = ntohl(buffer[2]); /* total number of bytes to copy */
303 filePos = ntohl(buffer[1]);
304 tfile = ntohl(buffer[0]);
305 /* try to minimize file syncs */
306 if (syncFile != tfile) {
307 if (syncFile >= 0)
308 code = (*adbase->sync) (adbase, syncFile);
309 else
310 code = 0;
311 syncFile = tfile;
312 if (code)
313 return code;
314 }
315 while (len > 0) {
316 thisSize = (len > sizeof(data) ? sizeof(data) : len);
317 /* copy sizeof(data) buffer bytes at a time */
318 code =
319 (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
320 thisSize);
321 if (code != thisSize)
322 return UBADLOG;
323 code =
324 (*adbase->write) (adbase, tfile, (char *)data, filePos,
325 thisSize);
326 if (code != thisSize)
327 return UBADLOG;
328 filePos += thisSize;
329 tpos += thisSize;
330 len -= thisSize;
331 }
332 } else {
333 ViceLog(0, ("corrupt log opcode (%d) at position %d\n",
334 opcode, tpos));
335 break; /* corrupt log! */
336 }
337 }
338 if (logIsGood) {
339 if (syncFile >= 0)
340 code = (*adbase->sync) (adbase, syncFile);
341 if (code)
342 return code;
343 } else {
344 ViceLog(0, ("Log read error on pass 2\n"));
345 return UBADLOG;
346 }
347 }
348
349 /* now truncate the log, we're done with it */
350 code = (*adbase->truncate) (adbase, LOGFILE, 0);
351 return code;
352 }
353
354 /*! \brief
355 * Called at initialization to figure out version of the dbase we really have.
356 *
357 * This routine is called after replaying the log; it reads the restored labels.
358 */
359 static int
360 InitializeDB(struct ubik_dbase *adbase)
361 {
362 afs_int32 code;
363
364 code = (*adbase->getlabel) (adbase, 0, &adbase->version);
365 if (code) {
366 /* try setting the label to a new value */
367 UBIK_VERSION_LOCK;
368 adbase->version.epoch = 1; /* value for newly-initialized db */
369 adbase->version.counter = 1;
370 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
371 if (code) {
372 /* failed, try to set it back */
373 adbase->version.epoch = 0;
374 adbase->version.counter = 0;
375 (*adbase->setlabel) (adbase, 0, &adbase->version);
376 }
377 #ifdef AFS_PTHREAD_ENV
378 opr_cv_broadcast(&adbase->version_cond);
379 #else
380 LWP_NoYieldSignal(&adbase->version);
381 #endif
382 UBIK_VERSION_UNLOCK;
383 }
384 return 0;
385 }
386
387 /*!
388 * \brief initialize the local ubik_dbase
389 *
390 * We replay the logs and then read the resulting file to figure out what version we've really got.
391 */
392 int
393 urecovery_Initialize(struct ubik_dbase *adbase)
394 {
395 afs_int32 code;
396
397 DBHOLD(adbase);
398 code = ReplayLog(adbase);
399 if (code)
400 goto done;
401 code = InitializeDB(adbase);
402 done:
403 DBRELE(adbase);
404 return code;
405 }
406
407 /*!
408 * \brief Main interaction loop for the recovery manager
409 *
410 * The recovery light-weight process only runs when you're the
411 * synchronization site. It performs the following tasks, if and only
412 * if the prerequisite tasks have been performed successfully (it
413 * keeps track of which ones have been performed in its bit map,
414 * \p urecovery_state).
415 *
416 * First, it is responsible for probing that all servers are up. This
417 * is the only operation that must be performed even if this is not
418 * yet the sync site, since otherwise this site may not notice that
419 * enough other machines are running to even elect this guy to be the
420 * sync site.
421 *
422 * After that, the recovery process does nothing until the beacon and
423 * voting modules manage to get this site elected sync site.
424 *
425 * After becoming sync site, recovery first attempts to find the best
426 * database available in the network (it must do this in order to
427 * ensure finding the latest committed data). After finding the right
428 * database, it must fetch this dbase to the sync site.
429 *
430 * After fetching the dbase, it relabels it with a new version number,
431 * to ensure that everyone recognizes this dbase as the most recent
432 * dbase.
433 *
434 * One the dbase has been relabelled, this machine can start handling
435 * requests. However, the recovery module still has one more task:
436 * propagating the dbase out to everyone who is up in the network.
437 */
438 void *
439 urecovery_Interact(void *dummy)
440 {
441 afs_int32 code;
442 struct ubik_server *bestServer = NULL;
443 struct ubik_server *ts;
444 int dbok, doingRPC, now;
445 afs_int32 lastProbeTime;
446 /* if we're the sync site, the best db version we've found yet */
447 static struct ubik_version bestDBVersion;
448 struct ubik_version tversion;
449 struct timeval tv;
450 int length, tlen, offset, file, nbytes;
451 struct rx_call *rxcall;
452 char tbuffer[1024];
453 struct ubik_stat ubikstat;
454 struct in_addr inAddr;
455 char hoststr[16];
456 char pbuffer[1028];
457 int fd = -1;
458 afs_int32 pass;
459
460 afs_pthread_setname_self("recovery");
461
462 /* otherwise, begin interaction */
463 urecovery_state = 0;
464 lastProbeTime = 0;
465 while (1) {
466 /* Run through this loop every 4 seconds */
467 tv.tv_sec = 4;
468 tv.tv_usec = 0;
469 #ifdef AFS_PTHREAD_ENV
470 select(0, 0, 0, 0, &tv);
471 #else
472 IOMGR_Select(0, 0, 0, 0, &tv);
473 #endif
474
475 ViceLog(5, ("recovery running in state %x\n", urecovery_state));
476
477 /* Every 30 seconds, check all the down servers and mark them
478 * as up if they respond. When a server comes up or found to
479 * not be current, then re-find the the best database and
480 * propogate it.
481 */
482 if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
483
484 for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
485 UBIK_BEACON_LOCK;
486 if (!ts->up) {
487 UBIK_BEACON_UNLOCK;
488 doingRPC = 1;
489 code = DoProbe(ts);
490 if (code == 0) {
491 UBIK_BEACON_LOCK;
492 ts->up = 1;
493 UBIK_BEACON_UNLOCK;
494 DBHOLD(ubik_dbase);
495 urecovery_state &= ~UBIK_RECFOUNDDB;
496 DBRELE(ubik_dbase);
497 }
498 } else {
499 UBIK_BEACON_UNLOCK;
500 DBHOLD(ubik_dbase);
501 if (!ts->currentDB)
502 urecovery_state &= ~UBIK_RECFOUNDDB;
503 DBRELE(ubik_dbase);
504 }
505 }
506
507 if (doingRPC)
508 now = FT_ApproxTime();
509 lastProbeTime = now;
510 }
511
512 /* Mark whether we are the sync site */
513 DBHOLD(ubik_dbase);
514 if (!ubeacon_AmSyncSite()) {
515 urecovery_state &= ~UBIK_RECSYNCSITE;
516 DBRELE(ubik_dbase);
517 continue; /* nothing to do */
518 }
519 urecovery_state |= UBIK_RECSYNCSITE;
520
521 /* If a server has just come up or if we have not found the
522 * most current database, then go find the most current db.
523 */
524 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
525 int okcalls = 0;
526 DBRELE(ubik_dbase);
527 bestServer = (struct ubik_server *)0;
528 bestDBVersion.epoch = 0;
529 bestDBVersion.counter = 0;
530 for (ts = ubik_servers; ts; ts = ts->next) {
531 UBIK_BEACON_LOCK;
532 if (!ts->up) {
533 UBIK_BEACON_UNLOCK;
534 continue; /* don't bother with these guys */
535 }
536 UBIK_BEACON_UNLOCK;
537 if (ts->isClone)
538 continue;
539 UBIK_ADDR_LOCK;
540 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
541 UBIK_ADDR_UNLOCK;
542 if (code == 0) {
543 okcalls++;
544 /* perhaps this is the best version */
545 if (vcmp(ts->version, bestDBVersion) > 0) {
546 /* new best version */
547 bestDBVersion = ts->version;
548 bestServer = ts;
549 }
550 }
551 }
552
553 DBHOLD(ubik_dbase);
554
555 if (okcalls + 1 >= ubik_quorum) {
556 /* If we've asked a majority of sites about their db version,
557 * then we can say with confidence that we've found the best db
558 * version. If we haven't contacted most sites (because
559 * GetVersion failed or because we already know the server is
560 * down), then we don't really know if we know about the best
561 * db version. So we can only proceed in here if 'okcalls'
562 * indicates we managed to contact a majority of sites. */
563
564 /* take into consideration our version. Remember if we,
565 * the sync site, have the best version. Also note that
566 * we may need to send the best version out.
567 */
568 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
569 bestDBVersion = ubik_dbase->version;
570 bestServer = (struct ubik_server *)0;
571 urecovery_state |= UBIK_RECHAVEDB;
572 } else {
573 /* Clear the flag only when we know we have to retrieve
574 * the db. Because urecovery_AllBetter() looks at it.
575 */
576 urecovery_state &= ~UBIK_RECHAVEDB;
577 }
578 urecovery_state |= UBIK_RECFOUNDDB;
579 urecovery_state &= ~UBIK_RECSENTDB;
580 }
581 }
582 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
583 DBRELE(ubik_dbase);
584 continue; /* not ready */
585 }
586
587 /* If we, the sync site, do not have the best db version, then
588 * go and get it from the server that does.
589 */
590 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
591 urecovery_state |= UBIK_RECHAVEDB;
592 } else {
593 /* we don't have the best version; we should fetch it. */
594 urecovery_AbortAll(ubik_dbase);
595
596 /* Rx code to do the Bulk fetch */
597 file = 0;
598 offset = 0;
599 UBIK_ADDR_LOCK;
600 rxcall = rx_NewCall(bestServer->disk_rxcid);
601
602 ViceLog(0, ("Ubik: Synchronize database via DISK_GetFile to server %s\n",
603 afs_inet_ntoa_r(bestServer->addr[0], hoststr)));
604 UBIK_ADDR_UNLOCK;
605
606 code = StartDISK_GetFile(rxcall, file);
607 if (code) {
608 ViceLog(5, ("StartDiskGetFile failed=%d\n", code));
609 goto FetchEndCall;
610 }
611 nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
612 length = ntohl(length);
613 if (nbytes != sizeof(afs_int32)) {
614 ViceLog(5, ("Rx-read length error=%d\n", BULK_ERROR));
615 code = EIO;
616 goto FetchEndCall;
617 }
618
619 /* give invalid label during file transit */
620 UBIK_VERSION_LOCK;
621 tversion.epoch = 0;
622 code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
623 UBIK_VERSION_UNLOCK;
624 if (code) {
625 ViceLog(5, ("setlabel io error=%d\n", code));
626 goto FetchEndCall;
627 }
628 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
629 ubik_dbase->pathName, (file<0)?"SYS":"",
630 (file<0)?-file:file);
631 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
632 if (fd < 0) {
633 code = errno;
634 goto FetchEndCall;
635 }
636 code = lseek(fd, HDRSIZE, 0);
637 if (code != HDRSIZE) {
638 close(fd);
639 goto FetchEndCall;
640 }
641
642 pass = 0;
643 while (length > 0) {
644 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
645 #ifndef AFS_PTHREAD_ENV
646 if (pass % 4 == 0)
647 IOMGR_Poll();
648 #endif
649 nbytes = rx_Read(rxcall, tbuffer, tlen);
650 if (nbytes != tlen) {
651 ViceLog(5, ("Rx-read bulk error=%d\n", BULK_ERROR));
652 code = EIO;
653 close(fd);
654 goto FetchEndCall;
655 }
656 nbytes = write(fd, tbuffer, tlen);
657 pass++;
658 if (nbytes != tlen) {
659 code = UIOERROR;
660 close(fd);
661 goto FetchEndCall;
662 }
663 offset += tlen;
664 length -= tlen;
665 }
666 code = close(fd);
667 if (code)
668 goto FetchEndCall;
669 code = EndDISK_GetFile(rxcall, &tversion);
670 FetchEndCall:
671 code = rx_EndCall(rxcall, code);
672 if (!code) {
673 /* we got a new file, set up its header */
674 urecovery_state |= UBIK_RECHAVEDB;
675 UBIK_VERSION_LOCK;
676 memcpy(&ubik_dbase->version, &tversion,
677 sizeof(struct ubik_version));
678 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
679 ubik_dbase->pathName, (file<0)?"SYS":"",
680 (file<0)?-file:file);
681 #ifdef AFS_NT40_ENV
682 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
683 ubik_dbase->pathName, (file<0)?"SYS":"",
684 (file<0)?-file:file);
685 code = unlink(pbuffer);
686 if (!code)
687 code = rename(tbuffer, pbuffer);
688 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
689 ubik_dbase->pathName, (file<0)?"SYS":"",
690 (file<0)?-file:file);
691 #endif
692 if (!code)
693 code = rename(pbuffer, tbuffer);
694 if (!code) {
695 (*ubik_dbase->open) (ubik_dbase, file);
696 /* after data is good, sync disk with correct label */
697 code =
698 (*ubik_dbase->setlabel) (ubik_dbase, 0,
699 &ubik_dbase->version);
700 }
701 UBIK_VERSION_UNLOCK;
702 #ifdef AFS_NT40_ENV
703 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
704 ubik_dbase->pathName, (file<0)?"SYS":"",
705 (file<0)?-file:file);
706 unlink(pbuffer);
707 #endif
708 }
709 if (code) {
710 unlink(pbuffer);
711 /*
712 * We will effectively invalidate the old data forever now.
713 * Unclear if we *should* but we do.
714 */
715 UBIK_VERSION_LOCK;
716 ubik_dbase->version.epoch = 0;
717 ubik_dbase->version.counter = 0;
718 UBIK_VERSION_UNLOCK;
719 ViceLog(0, ("Ubik: Synchronize database failed (error = %d)\n",
720 code));
721 } else {
722 ViceLog(0, ("Ubik: Synchronize database completed\n"));
723 urecovery_state |= UBIK_RECHAVEDB;
724 }
725 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
726 #ifdef AFS_PTHREAD_ENV
727 opr_cv_broadcast(&ubik_dbase->version_cond);
728 #else
729 LWP_NoYieldSignal(&ubik_dbase->version);
730 #endif
731 }
732 if (!(urecovery_state & UBIK_RECHAVEDB)) {
733 DBRELE(ubik_dbase);
734 continue; /* not ready */
735 }
736
737 /* If the database was newly initialized, then when we establish quorum, write
738 * a new label. This allows urecovery_AllBetter() to allow access for reads.
739 * Setting it to 2 also allows another site to come along with a newer
740 * database and overwrite this one.
741 */
742 if (ubik_dbase->version.epoch == 1) {
743 urecovery_AbortAll(ubik_dbase);
744 UBIK_VERSION_LOCK;
745 ubik_dbase->version.epoch = 2;
746 ubik_dbase->version.counter = 1;
747 code =
748 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
749 UBIK_VERSION_UNLOCK;
750 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
751 #ifdef AFS_PTHREAD_ENV
752 opr_cv_broadcast(&ubik_dbase->version_cond);
753 #else
754 LWP_NoYieldSignal(&ubik_dbase->version);
755 #endif
756 }
757
758 /* Check the other sites and send the database to them if they
759 * do not have the current db.
760 */
761 if (!(urecovery_state & UBIK_RECSENTDB)) {
762 /* now propagate out new version to everyone else */
763 dbok = 1; /* start off assuming they all worked */
764
765 /*
766 * Check if a write transaction is in progress. We can't send the
767 * db when a write is in progress here because the db would be
768 * obsolete as soon as it goes there. Also, ops after the begin
769 * trans would reach the recepient and wouldn't find a transaction
770 * pending there. Frankly, I don't think it's possible to get past
771 * the write-lock above if there is a write transaction in progress,
772 * but then, it won't hurt to check, will it?
773 */
774 if (ubik_dbase->flags & DBWRITING) {
775 struct timeval tv;
776 int safety = 0;
777 long cur_usec = 50000;
778 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
779 DBRELE(ubik_dbase);
780 /* sleep for a little while */
781 tv.tv_sec = 0;
782 tv.tv_usec = cur_usec;
783 #ifdef AFS_PTHREAD_ENV
784 select(0, 0, 0, 0, &tv);
785 #else
786 IOMGR_Select(0, 0, 0, 0, &tv);
787 #endif
788 cur_usec += 10000;
789 safety++;
790 DBHOLD(ubik_dbase);
791 }
792 }
793
794 for (ts = ubik_servers; ts; ts = ts->next) {
795 UBIK_ADDR_LOCK;
796 inAddr.s_addr = ts->addr[0];
797 UBIK_ADDR_UNLOCK;
798 UBIK_BEACON_LOCK;
799 if (!ts->up) {
800 UBIK_BEACON_UNLOCK;
801 ViceLog(5, ("recovery cannot send version to %s\n",
802 afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
803 dbok = 0;
804 continue;
805 }
806 UBIK_BEACON_UNLOCK;
807 ViceLog(5, ("recovery sending version to %s\n",
808 afs_inet_ntoa_r(inAddr.s_addr, hoststr)));
809 if (vcmp(ts->version, ubik_dbase->version) != 0) {
810 ViceLog(5, ("recovery stating local database\n"));
811
812 /* Rx code to do the Bulk Store */
813 code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
814 if (!code) {
815 length = ubikstat.size;
816 file = offset = 0;
817 UBIK_ADDR_LOCK;
818 rxcall = rx_NewCall(ts->disk_rxcid);
819 UBIK_ADDR_UNLOCK;
820 code =
821 StartDISK_SendFile(rxcall, file, length,
822 &ubik_dbase->version);
823 if (code) {
824 ViceLog(5, ("StartDiskSendFile failed=%d\n",
825 code));
826 goto StoreEndCall;
827 }
828 while (length > 0) {
829 tlen =
830 (length >
831 sizeof(tbuffer) ? sizeof(tbuffer) : length);
832 nbytes =
833 (*ubik_dbase->read) (ubik_dbase, file,
834 tbuffer, offset, tlen);
835 if (nbytes != tlen) {
836 code = UIOERROR;
837 ViceLog(5, ("Local disk read error=%d\n", code));
838 goto StoreEndCall;
839 }
840 nbytes = rx_Write(rxcall, tbuffer, tlen);
841 if (nbytes != tlen) {
842 code = BULK_ERROR;
843 ViceLog(5, ("Rx-write bulk error=%d\n", code));
844 goto StoreEndCall;
845 }
846 offset += tlen;
847 length -= tlen;
848 }
849 code = EndDISK_SendFile(rxcall);
850 StoreEndCall:
851 code = rx_EndCall(rxcall, code);
852 }
853 if (code == 0) {
854 /* we set a new file, process its header */
855 ts->version = ubik_dbase->version;
856 ts->currentDB = 1;
857 } else
858 dbok = 0;
859 } else {
860 /* mark file up to date */
861 ts->currentDB = 1;
862 }
863 }
864 if (dbok)
865 urecovery_state |= UBIK_RECSENTDB;
866 }
867 DBRELE(ubik_dbase);
868 }
869 AFS_UNREACHED(return(NULL));
870 }
871
872 /*!
873 * \brief send a Probe to all the network address of this server
874 *
875 * \return 0 if success, else return 1
876 */
877 int
878 DoProbe(struct ubik_server *server)
879 {
880 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
881 struct rx_connection *connSuccess = 0;
882 int i, nconns, success_i = -1;
883 afs_uint32 addr;
884 char buffer[32];
885 char hoststr[16];
886
887 UBIK_ADDR_LOCK;
888 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && (addr = server->addr[i]);
889 i++) {
890 conns[i] =
891 rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
892 addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
893
894 /* user requirement to use only the primary interface */
895 if (ubikPrimaryAddrOnly) {
896 i = 1;
897 break;
898 }
899 }
900 UBIK_ADDR_UNLOCK;
901 nconns = i;
902 opr_Assert(nconns); /* at least one interface address for this server */
903
904 multi_Rx(conns, nconns) {
905 multi_DISK_Probe();
906 if (!multi_error) { /* first success */
907 success_i = multi_i;
908
909 multi_Abort;
910 }
911 } multi_End_Ignore;
912
913 if (success_i >= 0) {
914 UBIK_ADDR_LOCK;
915 addr = server->addr[success_i]; /* successful interface addr */
916
917 if (server->disk_rxcid) /* destroy existing conn */
918 rx_DestroyConnection(server->disk_rxcid);
919 if (server->vote_rxcid)
920 rx_DestroyConnection(server->vote_rxcid);
921
922 /* make new connections */
923 server->disk_rxcid = conns[success_i];
924 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
925 VOTE_SERVICE_ID, addr_globals.ubikSecClass,
926 addr_globals.ubikSecIndex);
927
928 connSuccess = conns[success_i];
929 strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
930
931 ViceLog(0, ("ubik:server %s is back up: will be contacted through %s\n",
932 buffer, afs_inet_ntoa_r(addr, hoststr)));
933 UBIK_ADDR_UNLOCK;
934 }
935
936 /* Destroy all connections except the one on which we succeeded */
937 for (i = 0; i < nconns; i++)
938 if (conns[i] != connSuccess)
939 rx_DestroyConnection(conns[i]);
940
941 if (!connSuccess)
942 ViceLog(5, ("ubik:server %s still down\n",
943 afs_inet_ntoa_r(server->addr[0], hoststr)));
944
945 if (connSuccess)
946 return 0; /* success */
947 else
948 return 1; /* failure */
949 }