2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
18 #ifdef AFS_PTHREAD_ENV
19 # include <opr/lock.h>
21 # include <opr/lockstub.h>
27 #include <afs/afsutil.h>
29 #define UBIK_INTERNALS
33 static void printServerInfo(void);
36 * routines for handling requests remotely-submitted by the sync site. These are
37 * only write transactions (we don't propagate read trans), and there is at most one
38 * write transaction extant at any one time.
41 struct ubik_trans
*ubik_currentTrans
= 0;
45 /* the rest of these guys handle remote execution of write
46 * transactions: this is the code executed on the other servers when a
47 * sync site is executing a write transaction.
50 SDISK_Begin(struct rx_call
*rxcall
, struct ubik_tid
*atid
)
54 if ((code
= ubik_CheckAuth(rxcall
))) {
58 if (urecovery_AllBetter(ubik_dbase
, 0) == 0) {
62 urecovery_CheckTid(atid
, 1);
63 code
= udisk_begin(ubik_dbase
, UBIK_WRITETRANS
, &ubik_currentTrans
);
64 if (!code
&& ubik_currentTrans
) {
65 /* label this trans with the right trans id */
66 ubik_currentTrans
->tid
.epoch
= atid
->epoch
;
67 ubik_currentTrans
->tid
.counter
= atid
->counter
;
76 SDISK_Commit(struct rx_call
*rxcall
, struct ubik_tid
*atid
)
80 if ((code
= ubik_CheckAuth(rxcall
))) {
83 ObtainWriteLock(&ubik_dbase
->cache_lock
);
85 if (!ubik_currentTrans
) {
90 * sanity check to make sure only write trans appear here
92 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
97 urecovery_CheckTid(atid
, 0);
98 if (!ubik_currentTrans
) {
103 code
= udisk_commit(ubik_currentTrans
);
105 /* sync site should now match */
106 uvote_set_dbVersion(ubik_dbase
->version
);
110 ReleaseWriteLock(&ubik_dbase
->cache_lock
);
115 SDISK_ReleaseLocks(struct rx_call
*rxcall
, struct ubik_tid
*atid
)
119 if ((code
= ubik_CheckAuth(rxcall
))) {
125 if (!ubik_currentTrans
) {
129 /* sanity check to make sure only write trans appear here */
130 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
135 urecovery_CheckTid(atid
, 0);
136 if (!ubik_currentTrans
) {
141 /* If the thread is not waiting for lock - ok to end it */
142 if (ubik_currentTrans
->locktype
!= LOCKWAIT
) {
143 udisk_end(ubik_currentTrans
);
145 ubik_currentTrans
= (struct ubik_trans
*)0;
152 SDISK_Abort(struct rx_call
*rxcall
, struct ubik_tid
*atid
)
156 if ((code
= ubik_CheckAuth(rxcall
))) {
160 if (!ubik_currentTrans
) {
164 /* sanity check to make sure only write trans appear here */
165 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
170 urecovery_CheckTid(atid
, 0);
171 if (!ubik_currentTrans
) {
176 code
= udisk_abort(ubik_currentTrans
);
177 /* If the thread is not waiting for lock - ok to end it */
178 if (ubik_currentTrans
->locktype
!= LOCKWAIT
) {
179 udisk_end(ubik_currentTrans
);
181 ubik_currentTrans
= (struct ubik_trans
*)0;
187 /* apos and alen are not used */
189 SDISK_Lock(struct rx_call
*rxcall
, struct ubik_tid
*atid
,
190 afs_int32 afile
, afs_int32 apos
, afs_int32 alen
, afs_int32 atype
)
193 struct ubik_trans
*ubik_thisTrans
;
195 if ((code
= ubik_CheckAuth(rxcall
))) {
199 if (!ubik_currentTrans
) {
203 /* sanity check to make sure only write trans appear here */
204 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
212 urecovery_CheckTid(atid
, 0);
213 if (!ubik_currentTrans
) {
218 ubik_thisTrans
= ubik_currentTrans
;
219 code
= ulock_getLock(ubik_currentTrans
, atype
, 1);
221 /* While waiting, the transaction may have been ended/
222 * aborted from under us (urecovery_CheckTid). In that
223 * case, end the transaction here.
225 if (!code
&& (ubik_currentTrans
!= ubik_thisTrans
)) {
226 udisk_end(ubik_thisTrans
);
235 * \brief Write a vector of data
238 SDISK_WriteV(struct rx_call
*rxcall
, struct ubik_tid
*atid
,
239 iovec_wrt
*io_vector
, iovec_buf
*io_buffer
)
241 afs_int32 code
, i
, offset
;
242 struct ubik_iovec
*iovec
;
245 if ((code
= ubik_CheckAuth(rxcall
))) {
249 if (!ubik_currentTrans
) {
253 /* sanity check to make sure only write trans appear here */
254 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
259 urecovery_CheckTid(atid
, 0);
260 if (!ubik_currentTrans
) {
265 iovec
= (struct ubik_iovec
*)io_vector
->iovec_wrt_val
;
266 iobuf
= (char *)io_buffer
->iovec_buf_val
;
267 for (i
= 0, offset
= 0; i
< io_vector
->iovec_wrt_len
; i
++) {
268 /* Sanity check for going off end of buffer */
269 if ((offset
+ iovec
[i
].length
) > io_buffer
->iovec_buf_len
) {
273 udisk_write(ubik_currentTrans
, iovec
[i
].file
, &iobuf
[offset
],
274 iovec
[i
].position
, iovec
[i
].length
);
279 offset
+= iovec
[i
].length
;
287 SDISK_Write(struct rx_call
*rxcall
, struct ubik_tid
*atid
,
288 afs_int32 afile
, afs_int32 apos
, bulkdata
*adata
)
292 if ((code
= ubik_CheckAuth(rxcall
))) {
296 if (!ubik_currentTrans
) {
300 /* sanity check to make sure only write trans appear here */
301 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
306 urecovery_CheckTid(atid
, 0);
307 if (!ubik_currentTrans
) {
312 udisk_write(ubik_currentTrans
, afile
, adata
->bulkdata_val
, apos
,
313 adata
->bulkdata_len
);
320 SDISK_Truncate(struct rx_call
*rxcall
, struct ubik_tid
*atid
,
321 afs_int32 afile
, afs_int32 alen
)
325 if ((code
= ubik_CheckAuth(rxcall
))) {
329 if (!ubik_currentTrans
) {
333 /* sanity check to make sure only write trans appear here */
334 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
339 urecovery_CheckTid(atid
, 0);
340 if (!ubik_currentTrans
) {
344 code
= udisk_truncate(ubik_currentTrans
, afile
, alen
);
351 SDISK_GetVersion(struct rx_call
*rxcall
,
352 struct ubik_version
*aversion
)
356 if ((code
= ubik_CheckAuth(rxcall
))) {
361 * If we are the sync site, recovery shouldn't be running on any
362 * other site. We shouldn't be getting this RPC as long as we are
363 * the sync site. To prevent any unforseen activity, we should
364 * reject this RPC until we have recognized that we are not the
365 * sync site anymore, and/or if we have any pending WRITE
366 * transactions that have to complete. This way we can be assured
367 * that this RPC would not block any pending transactions that
368 * should either fail or pass. If we have recognized the fact that
369 * we are not the sync site any more, all write transactions would
370 * fail with UNOQUORUM anyway.
373 if (ubeacon_AmSyncSite()) {
378 code
= (*ubik_dbase
->getlabel
) (ubik_dbase
, 0, aversion
);
381 /* tell other side there's no dbase */
383 aversion
->counter
= 0;
389 SDISK_GetFile(struct rx_call
*rxcall
, afs_int32 file
,
390 struct ubik_version
*version
)
393 struct ubik_dbase
*dbase
;
395 struct ubik_stat ubikstat
;
400 if ((code
= ubik_CheckAuth(rxcall
))) {
405 code
= (*dbase
->stat
) (dbase
, file
, &ubikstat
);
410 length
= ubikstat
.size
;
411 tlen
= htonl(length
);
412 code
= rx_Write(rxcall
, (char *)&tlen
, sizeof(afs_int32
));
413 if (code
!= sizeof(afs_int32
)) {
415 ViceLog(5, ("Rx-write length error=%d\n", code
));
420 tlen
= (length
> sizeof(tbuffer
) ? sizeof(tbuffer
) : length
);
421 code
= (*dbase
->read
) (dbase
, file
, tbuffer
, offset
, tlen
);
424 ViceLog(5, ("read failed error=%d\n", code
));
427 code
= rx_Write(rxcall
, tbuffer
, tlen
);
430 ViceLog(5, ("Rx-write length error=%d\n", code
));
436 code
= (*dbase
->getlabel
) (dbase
, file
, version
); /* return the dbase, too */
442 SDISK_SendFile(struct rx_call
*rxcall
, afs_int32 file
,
443 afs_int32 length
, struct ubik_version
*avers
)
446 struct ubik_dbase
*dbase
= NULL
;
449 struct ubik_version tversion
;
451 struct rx_peer
*tpeer
;
452 struct rx_connection
*tconn
;
453 afs_uint32 otherHost
= 0;
460 /* send the file back to the requester */
465 if ((code
= ubik_CheckAuth(rxcall
))) {
469 /* next, we do a sanity check to see if the guy sending us the database is
470 * the guy we think is the sync site. It turns out that we might not have
471 * decided yet that someone's the sync site, but they could have enough
472 * votes from others to be sync site anyway, and could send us the database
473 * in advance of getting our votes. This is fine, what we're really trying
474 * to check is that some authenticated bogon isn't sending a random database
475 * into another configuration. This could happen on a bad configuration
476 * screwup. Thus, we only object if we're sure we know who the sync site
477 * is, and it ain't the guy talking to us.
479 offset
= uvote_GetSyncSite();
480 tconn
= rx_ConnectionOf(rxcall
);
481 tpeer
= rx_PeerOf(tconn
);
482 otherHost
= ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer
));
483 if (offset
&& offset
!= otherHost
) {
484 /* we *know* this is the wrong guy */
485 char sync_hoststr
[16];
487 ("Ubik: Refusing synchronization with server %s since it is not the sync-site (%s).\n",
488 afs_inet_ntoa_r(otherHost
, hoststr
),
489 afs_inet_ntoa_r(offset
, sync_hoststr
)));
495 /* abort any active trans that may scribble over the database */
496 urecovery_AbortAll(dbase
);
498 ViceLog(0, ("Ubik: Synchronize database via DISK_SendFile from server %s\n",
499 afs_inet_ntoa_r(otherHost
, hoststr
)));
503 epoch
= tversion
.epoch
= 0; /* start off by labelling in-transit db as invalid */
504 (*dbase
->setlabel
) (dbase
, file
, &tversion
); /* setlabel does sync */
505 snprintf(pbuffer
, sizeof(pbuffer
), "%s.DB%s%d.TMP",
506 ubik_dbase
->pathName
, (file
<0)?"SYS":"",
507 (file
<0)?-file
:file
);
508 fd
= open(pbuffer
, O_CREAT
| O_RDWR
| O_TRUNC
, 0600);
513 code
= lseek(fd
, HDRSIZE
, 0);
514 if (code
!= HDRSIZE
) {
519 memcpy(&ubik_dbase
->version
, &tversion
, sizeof(struct ubik_version
));
522 tlen
= (length
> sizeof(tbuffer
) ? sizeof(tbuffer
) : length
);
523 #if !defined(AFS_PTHREAD_ENV)
527 code
= rx_Read(rxcall
, tbuffer
, tlen
);
529 ViceLog(5, ("Rx-read length error=%d\n", code
));
534 code
= write(fd
, tbuffer
, tlen
);
537 ViceLog(5, ("write failed error=%d\n", code
));
549 /* sync data first, then write label and resync (resync done by setlabel call).
550 * This way, good label is only on good database. */
551 snprintf(tbuffer
, sizeof(tbuffer
), "%s.DB%s%d",
552 ubik_dbase
->pathName
, (file
<0)?"SYS":"", (file
<0)?-file
:file
);
554 snprintf(pbuffer
, sizeof(pbuffer
), "%s.DB%s%d.OLD",
555 ubik_dbase
->pathName
, (file
<0)?"SYS":"", (file
<0)?-file
:file
);
556 code
= unlink(pbuffer
);
558 code
= rename(tbuffer
, pbuffer
);
559 snprintf(pbuffer
, sizeof(pbuffer
), "%s.DB%s%d.TMP",
560 ubik_dbase
->pathName
, (file
<0)?"SYS":"", (file
<0)?-file
:file
);
563 code
= rename(pbuffer
, tbuffer
);
566 (*ubik_dbase
->open
) (ubik_dbase
, file
);
567 code
= (*ubik_dbase
->setlabel
) (dbase
, file
, avers
);
570 snprintf(pbuffer
, sizeof(pbuffer
), "%s.DB%s%d.OLD",
571 ubik_dbase
->pathName
, (file
<0)?"SYS":"", (file
<0)?-file
:file
);
574 memcpy(&ubik_dbase
->version
, avers
, sizeof(struct ubik_version
));
575 udisk_Invalidate(dbase
, file
); /* new dbase, flush disk buffers */
576 #ifdef AFS_PTHREAD_ENV
577 opr_Assert(pthread_cond_broadcast(&dbase
->version_cond
) == 0);
579 LWP_NoYieldSignal(&dbase
->version
);
587 if (pbuffer
[0] != '\0')
590 /* Failed to sync. Allow reads again for now. */
593 tversion
.epoch
= epoch
;
594 (*dbase
->setlabel
) (dbase
, file
, &tversion
);
597 ViceLog(0, ("Ubik: Synchronize database with server %s failed (error = %d)\n",
598 afs_inet_ntoa_r(otherHost
, hoststr
), code
));
600 uvote_set_dbVersion(*avers
);
601 ViceLog(0, ("Ubik: Synchronize database completed\n"));
609 SDISK_Probe(struct rx_call
*rxcall
)
615 * \brief Update remote machines addresses in my server list
617 * Send back my addresses to caller of this RPC
618 * \return zero on success, else 1.
621 SDISK_UpdateInterfaceAddr(struct rx_call
*rxcall
,
622 UbikInterfaceAddr
*inAddr
,
623 UbikInterfaceAddr
*outAddr
)
625 struct ubik_server
*ts
, *tmp
;
626 afs_uint32 remoteAddr
; /* in net byte order */
627 int i
, j
, found
= 0, probableMatch
= 0;
631 /* copy the output parameters */
632 for (i
= 0; i
< UBIK_MAX_INTERFACE_ADDR
; i
++)
633 outAddr
->hostAddr
[i
] = ntohl(ubik_host
[i
]);
635 remoteAddr
= htonl(inAddr
->hostAddr
[0]);
636 for (ts
= ubik_servers
; ts
; ts
= ts
->next
)
637 if (ts
->addr
[0] == remoteAddr
) { /* both in net byte order */
643 /* verify that all addresses in the incoming RPC are
644 ** not part of other server entries in my CellServDB
646 for (i
= 0; !found
&& (i
< UBIK_MAX_INTERFACE_ADDR
)
647 && inAddr
->hostAddr
[i
]; i
++) {
648 remoteAddr
= htonl(inAddr
->hostAddr
[i
]);
649 for (tmp
= ubik_servers
; (!found
&& tmp
); tmp
= tmp
->next
) {
650 if (ts
== tmp
) /* this is my server */
652 for (j
= 0; (j
< UBIK_MAX_INTERFACE_ADDR
) && tmp
->addr
[j
];
654 if (remoteAddr
== tmp
->addr
[j
]) {
662 /* if (probableMatch) */
663 /* inconsistent addresses in CellServDB */
664 if (!probableMatch
|| found
) {
665 ViceLog(0, ("Inconsistent Cell Info from server:\n"));
666 for (i
= 0; i
< UBIK_MAX_INTERFACE_ADDR
&& inAddr
->hostAddr
[i
]; i
++)
667 ViceLog(0, ("... %s\n", afs_inet_ntoa_r(htonl(inAddr
->hostAddr
[i
]), hoststr
)));
675 /* update our data structures */
676 for (i
= 1; i
< UBIK_MAX_INTERFACE_ADDR
; i
++)
677 ts
->addr
[i
] = htonl(inAddr
->hostAddr
[i
]);
679 ViceLog(0, ("ubik: A Remote Server has addresses:\n"));
680 for (i
= 0; i
< UBIK_MAX_INTERFACE_ADDR
&& ts
->addr
[i
]; i
++)
681 ViceLog(0, ("... %s\n", afs_inet_ntoa_r(ts
->addr
[i
], hoststr
)));
686 * The most likely cause of a DISK_UpdateInterfaceAddr RPC
687 * is because the server was restarted. Reset its state
688 * so that no DISK_Begin RPCs will be issued until the
689 * known database version is current.
692 ts
->beaconSinceDown
= 0;
694 urecovery_LostServer(ts
);
700 printServerInfo(void)
702 struct ubik_server
*ts
;
706 ViceLog(0, ("Local CellServDB:\n"));
707 for (ts
= ubik_servers
; ts
; ts
= ts
->next
, j
++) {
708 ViceLog(0, (" Server %d:\n", j
));
709 for (i
= 0; (i
< UBIK_MAX_INTERFACE_ADDR
) && ts
->addr
[i
]; i
++)
710 ViceLog(0, (" ... %s\n", afs_inet_ntoa_r(ts
->addr
[i
], hoststr
)));
715 SDISK_SetVersion(struct rx_call
*rxcall
, struct ubik_tid
*atid
,
716 struct ubik_version
*oldversionp
,
717 struct ubik_version
*newversionp
)
721 if ((code
= ubik_CheckAuth(rxcall
))) {
725 if (!ubik_currentTrans
) {
729 /* sanity check to make sure only write trans appear here */
730 if (ubik_currentTrans
->type
!= UBIK_WRITETRANS
) {
735 /* Should not get this for the sync site */
736 if (ubeacon_AmSyncSite()) {
741 urecovery_CheckTid(atid
, 0);
742 if (!ubik_currentTrans
) {
747 /* Set the label if our version matches the sync-site's. Also set the label
748 * if our on-disk version matches the old version, and our view of the
749 * sync-site's version matches the new version. This suggests that
750 * ubik_dbVersion was updated while the sync-site was setting the new
751 * version, and it already told us via VOTE_Beacon. */
752 if (uvote_eq_dbVersion(*oldversionp
)
753 || (uvote_eq_dbVersion(*newversionp
)
754 && vcmp(ubik_dbase
->version
, *oldversionp
) == 0)) {
756 code
= (*ubik_dbase
->setlabel
) (ubik_dbase
, 0, newversionp
);
758 ubik_dbase
->version
= *newversionp
;
759 uvote_set_dbVersion(*newversionp
);