Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / ubik / remote.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <assert.h>
16
17 #include <afs/opr.h>
18 #ifdef AFS_PTHREAD_ENV
19 # include <opr/lock.h>
20 #else
21 # include <opr/lockstub.h>
22 #endif
23
24 #include <lock.h>
25 #include <rx/xdr.h>
26 #include <rx/rx.h>
27 #include <afs/afsutil.h>
28
29 #define UBIK_INTERNALS
30 #include "ubik.h"
31 #include "ubik_int.h"
32
33 static void printServerInfo(void);
34
35 /*! \file
36 * routines for handling requests remotely-submitted by the sync site. These are
37 * only write transactions (we don't propagate read trans), and there is at most one
38 * write transaction extant at any one time.
39 */
40
41 struct ubik_trans *ubik_currentTrans = 0;
42
43
44
45 /* the rest of these guys handle remote execution of write
46 * transactions: this is the code executed on the other servers when a
47 * sync site is executing a write transaction.
48 */
49 afs_int32
50 SDISK_Begin(struct rx_call *rxcall, struct ubik_tid *atid)
51 {
52 afs_int32 code;
53
54 if ((code = ubik_CheckAuth(rxcall))) {
55 return code;
56 }
57 DBHOLD(ubik_dbase);
58 if (urecovery_AllBetter(ubik_dbase, 0) == 0) {
59 code = UNOQUORUM;
60 goto out;
61 }
62 urecovery_CheckTid(atid, 1);
63 code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
64 if (!code && ubik_currentTrans) {
65 /* label this trans with the right trans id */
66 ubik_currentTrans->tid.epoch = atid->epoch;
67 ubik_currentTrans->tid.counter = atid->counter;
68 }
69 out:
70 DBRELE(ubik_dbase);
71 return code;
72 }
73
74
75 afs_int32
76 SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
77 {
78 afs_int32 code;
79
80 if ((code = ubik_CheckAuth(rxcall))) {
81 return code;
82 }
83 ObtainWriteLock(&ubik_dbase->cache_lock);
84 DBHOLD(ubik_dbase);
85 if (!ubik_currentTrans) {
86 code = USYNC;
87 goto done;
88 }
89 /*
90 * sanity check to make sure only write trans appear here
91 */
92 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
93 code = UBADTYPE;
94 goto done;
95 }
96
97 urecovery_CheckTid(atid, 0);
98 if (!ubik_currentTrans) {
99 code = USYNC;
100 goto done;
101 }
102
103 code = udisk_commit(ubik_currentTrans);
104 if (code == 0) {
105 /* sync site should now match */
106 uvote_set_dbVersion(ubik_dbase->version);
107 }
108 done:
109 DBRELE(ubik_dbase);
110 ReleaseWriteLock(&ubik_dbase->cache_lock);
111 return code;
112 }
113
114 afs_int32
115 SDISK_ReleaseLocks(struct rx_call *rxcall, struct ubik_tid *atid)
116 {
117 afs_int32 code;
118
119 if ((code = ubik_CheckAuth(rxcall))) {
120 return code;
121 }
122
123 DBHOLD(ubik_dbase);
124
125 if (!ubik_currentTrans) {
126 code = USYNC;
127 goto done;
128 }
129 /* sanity check to make sure only write trans appear here */
130 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
131 code = UBADTYPE;
132 goto done;
133 }
134
135 urecovery_CheckTid(atid, 0);
136 if (!ubik_currentTrans) {
137 code = USYNC;
138 goto done;
139 }
140
141 /* If the thread is not waiting for lock - ok to end it */
142 if (ubik_currentTrans->locktype != LOCKWAIT) {
143 udisk_end(ubik_currentTrans);
144 }
145 ubik_currentTrans = (struct ubik_trans *)0;
146 done:
147 DBRELE(ubik_dbase);
148 return code;
149 }
150
151 afs_int32
152 SDISK_Abort(struct rx_call *rxcall, struct ubik_tid *atid)
153 {
154 afs_int32 code;
155
156 if ((code = ubik_CheckAuth(rxcall))) {
157 return code;
158 }
159 DBHOLD(ubik_dbase);
160 if (!ubik_currentTrans) {
161 code = USYNC;
162 goto done;
163 }
164 /* sanity check to make sure only write trans appear here */
165 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
166 code = UBADTYPE;
167 goto done;
168 }
169
170 urecovery_CheckTid(atid, 0);
171 if (!ubik_currentTrans) {
172 code = USYNC;
173 goto done;
174 }
175
176 code = udisk_abort(ubik_currentTrans);
177 /* If the thread is not waiting for lock - ok to end it */
178 if (ubik_currentTrans->locktype != LOCKWAIT) {
179 udisk_end(ubik_currentTrans);
180 }
181 ubik_currentTrans = (struct ubik_trans *)0;
182 done:
183 DBRELE(ubik_dbase);
184 return code;
185 }
186
187 /* apos and alen are not used */
188 afs_int32
189 SDISK_Lock(struct rx_call *rxcall, struct ubik_tid *atid,
190 afs_int32 afile, afs_int32 apos, afs_int32 alen, afs_int32 atype)
191 {
192 afs_int32 code;
193 struct ubik_trans *ubik_thisTrans;
194
195 if ((code = ubik_CheckAuth(rxcall))) {
196 return code;
197 }
198 DBHOLD(ubik_dbase);
199 if (!ubik_currentTrans) {
200 code = USYNC;
201 goto done;
202 }
203 /* sanity check to make sure only write trans appear here */
204 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
205 code = UBADTYPE;
206 goto done;
207 }
208 if (alen != 1) {
209 code = UBADLOCK;
210 goto done;
211 }
212 urecovery_CheckTid(atid, 0);
213 if (!ubik_currentTrans) {
214 code = USYNC;
215 goto done;
216 }
217
218 ubik_thisTrans = ubik_currentTrans;
219 code = ulock_getLock(ubik_currentTrans, atype, 1);
220
221 /* While waiting, the transaction may have been ended/
222 * aborted from under us (urecovery_CheckTid). In that
223 * case, end the transaction here.
224 */
225 if (!code && (ubik_currentTrans != ubik_thisTrans)) {
226 udisk_end(ubik_thisTrans);
227 code = USYNC;
228 }
229 done:
230 DBRELE(ubik_dbase);
231 return code;
232 }
233
234 /*!
235 * \brief Write a vector of data
236 */
237 afs_int32
238 SDISK_WriteV(struct rx_call *rxcall, struct ubik_tid *atid,
239 iovec_wrt *io_vector, iovec_buf *io_buffer)
240 {
241 afs_int32 code, i, offset;
242 struct ubik_iovec *iovec;
243 char *iobuf;
244
245 if ((code = ubik_CheckAuth(rxcall))) {
246 return code;
247 }
248 DBHOLD(ubik_dbase);
249 if (!ubik_currentTrans) {
250 code = USYNC;
251 goto done;
252 }
253 /* sanity check to make sure only write trans appear here */
254 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
255 code = UBADTYPE;
256 goto done;
257 }
258
259 urecovery_CheckTid(atid, 0);
260 if (!ubik_currentTrans) {
261 code = USYNC;
262 goto done;
263 }
264
265 iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
266 iobuf = (char *)io_buffer->iovec_buf_val;
267 for (i = 0, offset = 0; i < io_vector->iovec_wrt_len; i++) {
268 /* Sanity check for going off end of buffer */
269 if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
270 code = UINTERNAL;
271 } else {
272 code =
273 udisk_write(ubik_currentTrans, iovec[i].file, &iobuf[offset],
274 iovec[i].position, iovec[i].length);
275 }
276 if (code)
277 break;
278
279 offset += iovec[i].length;
280 }
281 done:
282 DBRELE(ubik_dbase);
283 return code;
284 }
285
286 afs_int32
287 SDISK_Write(struct rx_call *rxcall, struct ubik_tid *atid,
288 afs_int32 afile, afs_int32 apos, bulkdata *adata)
289 {
290 afs_int32 code;
291
292 if ((code = ubik_CheckAuth(rxcall))) {
293 return code;
294 }
295 DBHOLD(ubik_dbase);
296 if (!ubik_currentTrans) {
297 code = USYNC;
298 goto done;
299 }
300 /* sanity check to make sure only write trans appear here */
301 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
302 code = UBADTYPE;
303 goto done;
304 }
305
306 urecovery_CheckTid(atid, 0);
307 if (!ubik_currentTrans) {
308 code = USYNC;
309 goto done;
310 }
311 code =
312 udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos,
313 adata->bulkdata_len);
314 done:
315 DBRELE(ubik_dbase);
316 return code;
317 }
318
319 afs_int32
320 SDISK_Truncate(struct rx_call *rxcall, struct ubik_tid *atid,
321 afs_int32 afile, afs_int32 alen)
322 {
323 afs_int32 code;
324
325 if ((code = ubik_CheckAuth(rxcall))) {
326 return code;
327 }
328 DBHOLD(ubik_dbase);
329 if (!ubik_currentTrans) {
330 code = USYNC;
331 goto done;
332 }
333 /* sanity check to make sure only write trans appear here */
334 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
335 code = UBADTYPE;
336 goto done;
337 }
338
339 urecovery_CheckTid(atid, 0);
340 if (!ubik_currentTrans) {
341 code = USYNC;
342 goto done;
343 }
344 code = udisk_truncate(ubik_currentTrans, afile, alen);
345 done:
346 DBRELE(ubik_dbase);
347 return code;
348 }
349
350 afs_int32
351 SDISK_GetVersion(struct rx_call *rxcall,
352 struct ubik_version *aversion)
353 {
354 afs_int32 code;
355
356 if ((code = ubik_CheckAuth(rxcall))) {
357 return code;
358 }
359
360 /*
361 * If we are the sync site, recovery shouldn't be running on any
362 * other site. We shouldn't be getting this RPC as long as we are
363 * the sync site. To prevent any unforseen activity, we should
364 * reject this RPC until we have recognized that we are not the
365 * sync site anymore, and/or if we have any pending WRITE
366 * transactions that have to complete. This way we can be assured
367 * that this RPC would not block any pending transactions that
368 * should either fail or pass. If we have recognized the fact that
369 * we are not the sync site any more, all write transactions would
370 * fail with UNOQUORUM anyway.
371 */
372 DBHOLD(ubik_dbase);
373 if (ubeacon_AmSyncSite()) {
374 DBRELE(ubik_dbase);
375 return UDEADLOCK;
376 }
377
378 code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
379 DBRELE(ubik_dbase);
380 if (code) {
381 /* tell other side there's no dbase */
382 aversion->epoch = 0;
383 aversion->counter = 0;
384 }
385 return 0;
386 }
387
388 afs_int32
389 SDISK_GetFile(struct rx_call *rxcall, afs_int32 file,
390 struct ubik_version *version)
391 {
392 afs_int32 code;
393 struct ubik_dbase *dbase;
394 afs_int32 offset;
395 struct ubik_stat ubikstat;
396 char tbuffer[256];
397 afs_int32 tlen;
398 afs_int32 length;
399
400 if ((code = ubik_CheckAuth(rxcall))) {
401 return code;
402 }
403 dbase = ubik_dbase;
404 DBHOLD(dbase);
405 code = (*dbase->stat) (dbase, file, &ubikstat);
406 if (code < 0) {
407 DBRELE(dbase);
408 return code;
409 }
410 length = ubikstat.size;
411 tlen = htonl(length);
412 code = rx_Write(rxcall, (char *)&tlen, sizeof(afs_int32));
413 if (code != sizeof(afs_int32)) {
414 DBRELE(dbase);
415 ViceLog(5, ("Rx-write length error=%d\n", code));
416 return BULK_ERROR;
417 }
418 offset = 0;
419 while (length > 0) {
420 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
421 code = (*dbase->read) (dbase, file, tbuffer, offset, tlen);
422 if (code != tlen) {
423 DBRELE(dbase);
424 ViceLog(5, ("read failed error=%d\n", code));
425 return UIOERROR;
426 }
427 code = rx_Write(rxcall, tbuffer, tlen);
428 if (code != tlen) {
429 DBRELE(dbase);
430 ViceLog(5, ("Rx-write length error=%d\n", code));
431 return BULK_ERROR;
432 }
433 length -= tlen;
434 offset += tlen;
435 }
436 code = (*dbase->getlabel) (dbase, file, version); /* return the dbase, too */
437 DBRELE(dbase);
438 return code;
439 }
440
441 afs_int32
442 SDISK_SendFile(struct rx_call *rxcall, afs_int32 file,
443 afs_int32 length, struct ubik_version *avers)
444 {
445 afs_int32 code;
446 struct ubik_dbase *dbase = NULL;
447 char tbuffer[1024];
448 afs_int32 offset;
449 struct ubik_version tversion;
450 int tlen;
451 struct rx_peer *tpeer;
452 struct rx_connection *tconn;
453 afs_uint32 otherHost = 0;
454 char hoststr[16];
455 char pbuffer[1028];
456 int fd = -1;
457 afs_int32 epoch = 0;
458 afs_int32 pass;
459
460 /* send the file back to the requester */
461
462 dbase = ubik_dbase;
463 pbuffer[0] = '\0';
464
465 if ((code = ubik_CheckAuth(rxcall))) {
466 return code;
467 }
468
469 /* next, we do a sanity check to see if the guy sending us the database is
470 * the guy we think is the sync site. It turns out that we might not have
471 * decided yet that someone's the sync site, but they could have enough
472 * votes from others to be sync site anyway, and could send us the database
473 * in advance of getting our votes. This is fine, what we're really trying
474 * to check is that some authenticated bogon isn't sending a random database
475 * into another configuration. This could happen on a bad configuration
476 * screwup. Thus, we only object if we're sure we know who the sync site
477 * is, and it ain't the guy talking to us.
478 */
479 offset = uvote_GetSyncSite();
480 tconn = rx_ConnectionOf(rxcall);
481 tpeer = rx_PeerOf(tconn);
482 otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
483 if (offset && offset != otherHost) {
484 /* we *know* this is the wrong guy */
485 char sync_hoststr[16];
486 ViceLog(0,
487 ("Ubik: Refusing synchronization with server %s since it is not the sync-site (%s).\n",
488 afs_inet_ntoa_r(otherHost, hoststr),
489 afs_inet_ntoa_r(offset, sync_hoststr)));
490 return USYNC;
491 }
492
493 DBHOLD(dbase);
494
495 /* abort any active trans that may scribble over the database */
496 urecovery_AbortAll(dbase);
497
498 ViceLog(0, ("Ubik: Synchronize database via DISK_SendFile from server %s\n",
499 afs_inet_ntoa_r(otherHost, hoststr)));
500
501 offset = 0;
502 UBIK_VERSION_LOCK;
503 epoch = tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
504 (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */
505 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
506 ubik_dbase->pathName, (file<0)?"SYS":"",
507 (file<0)?-file:file);
508 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
509 if (fd < 0) {
510 code = errno;
511 goto failed_locked;
512 }
513 code = lseek(fd, HDRSIZE, 0);
514 if (code != HDRSIZE) {
515 close(fd);
516 goto failed_locked;
517 }
518 pass = 0;
519 memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version));
520 UBIK_VERSION_UNLOCK;
521 while (length > 0) {
522 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
523 #if !defined(AFS_PTHREAD_ENV)
524 if (pass % 4 == 0)
525 IOMGR_Poll();
526 #endif
527 code = rx_Read(rxcall, tbuffer, tlen);
528 if (code != tlen) {
529 ViceLog(5, ("Rx-read length error=%d\n", code));
530 code = BULK_ERROR;
531 close(fd);
532 goto failed;
533 }
534 code = write(fd, tbuffer, tlen);
535 pass++;
536 if (code != tlen) {
537 ViceLog(5, ("write failed error=%d\n", code));
538 code = UIOERROR;
539 close(fd);
540 goto failed;
541 }
542 offset += tlen;
543 length -= tlen;
544 }
545 code = close(fd);
546 if (code)
547 goto failed;
548
549 /* sync data first, then write label and resync (resync done by setlabel call).
550 * This way, good label is only on good database. */
551 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
552 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
553 #ifdef AFS_NT40_ENV
554 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
555 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
556 code = unlink(pbuffer);
557 if (!code)
558 code = rename(tbuffer, pbuffer);
559 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
560 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
561 #endif
562 if (!code)
563 code = rename(pbuffer, tbuffer);
564 UBIK_VERSION_LOCK;
565 if (!code) {
566 (*ubik_dbase->open) (ubik_dbase, file);
567 code = (*ubik_dbase->setlabel) (dbase, file, avers);
568 }
569 #ifdef AFS_NT40_ENV
570 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
571 ubik_dbase->pathName, (file<0)?"SYS":"", (file<0)?-file:file);
572 unlink(pbuffer);
573 #endif
574 memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
575 udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
576 #ifdef AFS_PTHREAD_ENV
577 opr_Assert(pthread_cond_broadcast(&dbase->version_cond) == 0);
578 #else
579 LWP_NoYieldSignal(&dbase->version);
580 #endif
581
582 failed_locked:
583 UBIK_VERSION_UNLOCK;
584
585 failed:
586 if (code) {
587 if (pbuffer[0] != '\0')
588 unlink(pbuffer);
589
590 /* Failed to sync. Allow reads again for now. */
591 if (dbase != NULL) {
592 UBIK_VERSION_LOCK;
593 tversion.epoch = epoch;
594 (*dbase->setlabel) (dbase, file, &tversion);
595 UBIK_VERSION_UNLOCK;
596 }
597 ViceLog(0, ("Ubik: Synchronize database with server %s failed (error = %d)\n",
598 afs_inet_ntoa_r(otherHost, hoststr), code));
599 } else {
600 uvote_set_dbVersion(*avers);
601 ViceLog(0, ("Ubik: Synchronize database completed\n"));
602 }
603 DBRELE(dbase);
604 return code;
605 }
606
607
608 afs_int32
609 SDISK_Probe(struct rx_call *rxcall)
610 {
611 return 0;
612 }
613
614 /*!
615 * \brief Update remote machines addresses in my server list
616 *
617 * Send back my addresses to caller of this RPC
618 * \return zero on success, else 1.
619 */
620 afs_int32
621 SDISK_UpdateInterfaceAddr(struct rx_call *rxcall,
622 UbikInterfaceAddr *inAddr,
623 UbikInterfaceAddr *outAddr)
624 {
625 struct ubik_server *ts, *tmp;
626 afs_uint32 remoteAddr; /* in net byte order */
627 int i, j, found = 0, probableMatch = 0;
628 char hoststr[16];
629
630 UBIK_ADDR_LOCK;
631 /* copy the output parameters */
632 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR; i++)
633 outAddr->hostAddr[i] = ntohl(ubik_host[i]);
634
635 remoteAddr = htonl(inAddr->hostAddr[0]);
636 for (ts = ubik_servers; ts; ts = ts->next)
637 if (ts->addr[0] == remoteAddr) { /* both in net byte order */
638 probableMatch = 1;
639 break;
640 }
641
642 if (probableMatch) {
643 /* verify that all addresses in the incoming RPC are
644 ** not part of other server entries in my CellServDB
645 */
646 for (i = 0; !found && (i < UBIK_MAX_INTERFACE_ADDR)
647 && inAddr->hostAddr[i]; i++) {
648 remoteAddr = htonl(inAddr->hostAddr[i]);
649 for (tmp = ubik_servers; (!found && tmp); tmp = tmp->next) {
650 if (ts == tmp) /* this is my server */
651 continue;
652 for (j = 0; (j < UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j];
653 j++)
654 if (remoteAddr == tmp->addr[j]) {
655 found = 1;
656 break;
657 }
658 }
659 }
660 }
661
662 /* if (probableMatch) */
663 /* inconsistent addresses in CellServDB */
664 if (!probableMatch || found) {
665 ViceLog(0, ("Inconsistent Cell Info from server:\n"));
666 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
667 ViceLog(0, ("... %s\n", afs_inet_ntoa_r(htonl(inAddr->hostAddr[i]), hoststr)));
668 fflush(stdout);
669 fflush(stderr);
670 printServerInfo();
671 UBIK_ADDR_UNLOCK;
672 return UBADHOST;
673 }
674
675 /* update our data structures */
676 for (i = 1; i < UBIK_MAX_INTERFACE_ADDR; i++)
677 ts->addr[i] = htonl(inAddr->hostAddr[i]);
678
679 ViceLog(0, ("ubik: A Remote Server has addresses:\n"));
680 for (i = 0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
681 ViceLog(0, ("... %s\n", afs_inet_ntoa_r(ts->addr[i], hoststr)));
682
683 UBIK_ADDR_UNLOCK;
684
685 /*
686 * The most likely cause of a DISK_UpdateInterfaceAddr RPC
687 * is because the server was restarted. Reset its state
688 * so that no DISK_Begin RPCs will be issued until the
689 * known database version is current.
690 */
691 UBIK_BEACON_LOCK;
692 ts->beaconSinceDown = 0;
693 ts->currentDB = 0;
694 urecovery_LostServer(ts);
695 UBIK_BEACON_UNLOCK;
696 return 0;
697 }
698
699 static void
700 printServerInfo(void)
701 {
702 struct ubik_server *ts;
703 int i, j = 1;
704 char hoststr[16];
705
706 ViceLog(0, ("Local CellServDB:\n"));
707 for (ts = ubik_servers; ts; ts = ts->next, j++) {
708 ViceLog(0, (" Server %d:\n", j));
709 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
710 ViceLog(0, (" ... %s\n", afs_inet_ntoa_r(ts->addr[i], hoststr)));
711 }
712 }
713
714 afs_int32
715 SDISK_SetVersion(struct rx_call *rxcall, struct ubik_tid *atid,
716 struct ubik_version *oldversionp,
717 struct ubik_version *newversionp)
718 {
719 afs_int32 code = 0;
720
721 if ((code = ubik_CheckAuth(rxcall))) {
722 return (code);
723 }
724 DBHOLD(ubik_dbase);
725 if (!ubik_currentTrans) {
726 code = USYNC;
727 goto done;
728 }
729 /* sanity check to make sure only write trans appear here */
730 if (ubik_currentTrans->type != UBIK_WRITETRANS) {
731 code = UBADTYPE;
732 goto done;
733 }
734
735 /* Should not get this for the sync site */
736 if (ubeacon_AmSyncSite()) {
737 code = UDEADLOCK;
738 goto done;
739 }
740
741 urecovery_CheckTid(atid, 0);
742 if (!ubik_currentTrans) {
743 code = USYNC;
744 goto done;
745 }
746
747 /* Set the label if our version matches the sync-site's. Also set the label
748 * if our on-disk version matches the old version, and our view of the
749 * sync-site's version matches the new version. This suggests that
750 * ubik_dbVersion was updated while the sync-site was setting the new
751 * version, and it already told us via VOTE_Beacon. */
752 if (uvote_eq_dbVersion(*oldversionp)
753 || (uvote_eq_dbVersion(*newversionp)
754 && vcmp(ubik_dbase->version, *oldversionp) == 0)) {
755 UBIK_VERSION_LOCK;
756 code = (*ubik_dbase->setlabel) (ubik_dbase, 0, newversionp);
757 if (!code) {
758 ubik_dbase->version = *newversionp;
759 uvote_set_dbVersion(*newversionp);
760 }
761 UBIK_VERSION_UNLOCK;
762 } else {
763 code = USYNC;
764 }
765 done:
766 DBRELE(ubik_dbase);
767 return code;
768 }