2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
19 # include <opr/lockstub.h>
21 #include <afs/afsutil.h>
23 #define UBIK_INTERNALS
28 static struct buffer
{
29 struct ubik_dbase
*dbase
; /*!< dbase within which the buffer resides */
30 afs_int32 file
; /*!< Unique cache key */
31 afs_int32 page
; /*!< page number */
32 struct buffer
*lru_next
;
33 struct buffer
*lru_prev
;
34 struct buffer
*hashNext
; /*!< next dude in hash table */
35 char *data
; /*!< ptr to the data */
36 char lockers
; /*!< usage ref count */
37 char dirty
; /*!< is buffer modified */
38 char hashIndex
; /*!< back ptr to hash table */
41 #define pHash(page) ((page) & (PHSIZE-1))
43 afs_int32 ubik_nBuffers
= NBUFFERS
;
44 static struct buffer
*phTable
[PHSIZE
]; /*!< page hash table */
45 static struct buffer
*LruBuffer
;
47 static int calls
= 0, ios
= 0, lastb
= 0;
48 static char *BufferData
;
49 static struct buffer
*newslot(struct ubik_dbase
*adbase
, afs_int32 afid
,
51 #define BADFID 0xffffffff
53 static int DTrunc(struct ubik_trans
*atrans
, afs_int32 fid
, afs_int32 length
);
55 static struct ubik_trunc
*freeTruncList
= 0;
58 * \brief Remove a transaction from the database's active transaction list. Don't free it.
61 unthread(struct ubik_trans
*atrans
)
63 struct ubik_trans
**lt
, *tt
;
64 lt
= &atrans
->dbase
->activeTrans
;
65 for (tt
= *lt
; tt
; lt
= &tt
->next
, tt
= *lt
) {
72 return 2; /* no entry */
76 * \brief some debugging assistance
79 udisk_Debug(struct ubik_debug
*aparm
)
84 memcpy(&aparm
->localVersion
, &ubik_dbase
->version
,
85 sizeof(struct ubik_version
));
86 aparm
->lockedPages
= 0;
87 aparm
->writeLockedPages
= 0;
89 for (i
= 0; i
< nbuffers
; i
++, tb
++) {
93 aparm
->writeLockedPages
++;
99 * \brief Write an opcode to the log.
101 * log format is defined here, and implicitly in recovery.c
103 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
104 * are in logged in network standard byte order, in case we want to move logs
105 * from machine-to-machine someday.
107 * Begin transaction: opcode \n
108 * Commit transaction: opcode, version (8 bytes) \n
109 * Truncate file: opcode, file number, length \n
110 * Abort transaction: opcode \n
111 * Write data: opcode, file, position, length, <length> data bytes \n
114 udisk_LogOpcode(struct ubik_dbase
*adbase
, afs_int32 aopcode
, int async
)
118 /* setup data and do write */
119 aopcode
= htonl(aopcode
);
120 code
= (*adbase
->buffered_append
)(adbase
, LOGFILE
, &aopcode
, sizeof(afs_int32
));
121 if (code
!= sizeof(afs_int32
))
124 /* optionally sync data */
126 code
= (*adbase
->sync
) (adbase
, LOGFILE
);
133 * \brief Log a commit, never syncing.
136 udisk_LogEnd(struct ubik_dbase
*adbase
, struct ubik_version
*aversion
)
142 data
[0] = htonl(LOGEND
);
143 data
[1] = htonl(aversion
->epoch
);
144 data
[2] = htonl(aversion
->counter
);
148 (*adbase
->buffered_append
)(adbase
, LOGFILE
, data
, 3 * sizeof(afs_int32
));
149 if (code
!= 3 * sizeof(afs_int32
))
152 /* finally sync the log */
153 code
= (*adbase
->sync
) (adbase
, LOGFILE
);
158 * \brief Log a truncate operation, never syncing.
161 udisk_LogTruncate(struct ubik_dbase
*adbase
, afs_int32 afile
,
168 data
[0] = htonl(LOGTRUNCATE
);
169 data
[1] = htonl(afile
);
170 data
[2] = htonl(alength
);
174 (*adbase
->buffered_append
)(adbase
, LOGFILE
, data
, 3 * sizeof(afs_int32
));
175 if (code
!= 3 * sizeof(afs_int32
))
181 * \brief Write some data to the log, never syncing.
184 udisk_LogWriteData(struct ubik_dbase
*adbase
, afs_int32 afile
, void *abuffer
,
185 afs_int32 apos
, afs_int32 alen
)
191 data
[0] = htonl(LOGDATA
);
192 data
[1] = htonl(afile
);
193 data
[2] = htonl(apos
);
194 data
[3] = htonl(alen
);
198 (*adbase
->buffered_append
)(adbase
, LOGFILE
, data
, 4 * sizeof(afs_int32
));
199 if (code
!= 4 * sizeof(afs_int32
))
203 code
= (*adbase
->buffered_append
)(adbase
, LOGFILE
, abuffer
, alen
);
210 udisk_Init(int abuffers
)
212 /* Initialize the venus buffer system. */
215 Buffers
= calloc(abuffers
, sizeof(struct buffer
));
216 BufferData
= malloc(abuffers
* UBIK_PAGESIZE
);
218 for (i
= 0; i
< PHSIZE
; i
++)
220 for (i
= 0; i
< abuffers
; i
++) {
221 /* Fill in each buffer with an empty indication. */
223 tb
->lru_next
= &(Buffers
[i
+ 1]);
224 tb
->lru_prev
= &(Buffers
[i
- 1]);
225 tb
->data
= &BufferData
[UBIK_PAGESIZE
* i
];
228 Buffers
[0].lru_prev
= &(Buffers
[abuffers
- 1]);
229 Buffers
[abuffers
- 1].lru_next
= &(Buffers
[0]);
230 LruBuffer
= &(Buffers
[0]);
235 * \brief Take a buffer and mark it as the least recently used buffer.
238 Dlru(struct buffer
*abuf
)
240 if (LruBuffer
== abuf
)
243 /* Unthread from where it is in the list */
244 abuf
->lru_next
->lru_prev
= abuf
->lru_prev
;
245 abuf
->lru_prev
->lru_next
= abuf
->lru_next
;
247 /* Thread onto beginning of LRU list */
248 abuf
->lru_next
= LruBuffer
;
249 abuf
->lru_prev
= LruBuffer
->lru_prev
;
251 LruBuffer
->lru_prev
->lru_next
= abuf
;
252 LruBuffer
->lru_prev
= abuf
;
257 * \brief Take a buffer and mark it as the most recently used buffer.
260 Dmru(struct buffer
*abuf
)
262 if (LruBuffer
== abuf
) {
263 LruBuffer
= LruBuffer
->lru_next
;
267 /* Unthread from where it is in the list */
268 abuf
->lru_next
->lru_prev
= abuf
->lru_prev
;
269 abuf
->lru_prev
->lru_next
= abuf
->lru_next
;
271 /* Thread onto end of LRU list - making it the MRU buffer */
272 abuf
->lru_next
= LruBuffer
;
273 abuf
->lru_prev
= LruBuffer
->lru_prev
;
274 LruBuffer
->lru_prev
->lru_next
= abuf
;
275 LruBuffer
->lru_prev
= abuf
;
279 MatchBuffer(struct buffer
*buf
, int page
, afs_int32 fid
,
280 struct ubik_trans
*atrans
)
282 if (buf
->page
!= page
) {
285 if (buf
->file
!= fid
) {
288 if (atrans
->type
== UBIK_READTRANS
&& buf
->dirty
) {
289 /* if 'buf' is dirty, it has uncommitted changes; we do not want to
290 * see uncommitted changes if we are a read transaction, so skip over
294 if (buf
->dbase
!= atrans
->dbase
) {
301 * \brief Get a pointer to a particular buffer.
304 DRead(struct ubik_trans
*atrans
, afs_int32 fid
, int page
)
306 /* Read a page from the disk. */
307 struct buffer
*tb
, *lastbuffer
, *found_tb
= NULL
;
309 struct ubik_dbase
*dbase
= atrans
->dbase
;
312 lastbuffer
= LruBuffer
->lru_prev
;
314 /* Skip for write transactions for a clean page - this may not be the right page to use */
315 if (MatchBuffer(lastbuffer
, page
, fid
, atrans
)
316 && (atrans
->type
== UBIK_READTRANS
|| lastbuffer
->dirty
)) {
322 for (tb
= phTable
[pHash(page
)]; tb
; tb
= tb
->hashNext
) {
323 if (MatchBuffer(tb
, page
, fid
, atrans
)) {
324 if (tb
->dirty
|| atrans
->type
== UBIK_READTRANS
) {
328 /* Remember this clean page - we might use it */
332 /* For a write transaction, use a matching clean page if no dirty one was found */
336 return found_tb
->data
;
340 tb
= newslot(dbase
, fid
, page
);
343 memset(tb
->data
, 0, UBIK_PAGESIZE
);
347 (*dbase
->read
) (dbase
, fid
, tb
->data
, page
* UBIK_PAGESIZE
,
353 ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno
));
358 /* Note that findslot sets the page field in the buffer equal to
359 * what it is searching for.
365 * \brief Zap truncated pages.
368 DTrunc(struct ubik_trans
*atrans
, afs_int32 fid
, afs_int32 length
)
373 struct ubik_dbase
*dbase
= atrans
->dbase
;
375 maxPage
= (length
+ UBIK_PAGESIZE
- 1) >> UBIK_LOGPAGESIZE
; /* first invalid page now in file */
376 for (i
= 0, tb
= Buffers
; i
< nbuffers
; i
++, tb
++) {
377 if (tb
->page
>= maxPage
&& tb
->file
== fid
&& tb
->dbase
== dbase
) {
386 * \brief Allocate a truncation entry.
388 * We allocate special entries representing truncations, rather than
389 * performing them immediately, so that we can abort a transaction easily by simply purging
390 * the in-core memory buffers and discarding these truncation entries.
392 static struct ubik_trunc
*
395 struct ubik_trunc
*tt
;
396 if (!freeTruncList
) {
397 freeTruncList
= malloc(sizeof(struct ubik_trunc
));
398 freeTruncList
->next
= (struct ubik_trunc
*)0;
401 freeTruncList
= tt
->next
;
406 * \brief Free a truncation entry.
409 PutTrunc(struct ubik_trunc
*at
)
411 at
->next
= freeTruncList
;
417 * \brief Find a truncation entry for a file, if any.
419 static struct ubik_trunc
*
420 FindTrunc(struct ubik_trans
*atrans
, afs_int32 afile
)
422 struct ubik_trunc
*tt
;
423 for (tt
= atrans
->activeTruncs
; tt
; tt
= tt
->next
) {
424 if (tt
->file
== afile
)
427 return (struct ubik_trunc
*)0;
431 * \brief Do truncates associated with \p atrans, and free them.
434 DoTruncs(struct ubik_trans
*atrans
)
436 struct ubik_trunc
*tt
, *nt
;
437 int (*tproc
) (struct ubik_dbase
*, afs_int32
, afs_int32
);
438 afs_int32 rcode
= 0, code
;
440 tproc
= atrans
->dbase
->truncate
;
441 for (tt
= atrans
->activeTruncs
; tt
; tt
= nt
) {
443 DTrunc(atrans
, tt
->file
, tt
->length
); /* zap pages from buffer cache */
444 code
= (*tproc
) (atrans
->dbase
, tt
->file
, tt
->length
);
449 /* don't unthread, because we do the entire list's worth here */
450 atrans
->activeTruncs
= (struct ubik_trunc
*)0;
455 * \brief Mark an \p fid as invalid.
458 udisk_Invalidate(struct ubik_dbase
*adbase
, afs_int32 afid
)
463 for (i
= 0, tb
= Buffers
; i
< nbuffers
; i
++, tb
++) {
464 if (tb
->file
== afid
) {
473 * \brief Move this page into the correct hash bucket.
476 FixupBucket(struct buffer
*ap
)
478 struct buffer
**lp
, *tp
;
480 /* first try to get it out of its current hash bucket, in which it might not be */
483 for (tp
= *lp
; tp
; tp
= tp
->hashNext
) {
490 /* now figure the new hash bucket */
492 ap
->hashIndex
= i
; /* remember where we are for deletion */
493 ap
->hashNext
= phTable
[i
]; /* add us to the list */
499 * \brief Create a new slot for a particular dbase page.
501 static struct buffer
*
502 newslot(struct ubik_dbase
*adbase
, afs_int32 afid
, afs_int32 apage
)
504 /* Find a usable buffer slot */
506 struct buffer
*pp
, *tp
;
508 pp
= 0; /* last pure */
509 for (i
= 0, tp
= LruBuffer
; i
< nbuffers
; i
++, tp
= tp
->lru_next
) {
510 if (!tp
->lockers
&& !tp
->dirty
) {
517 /* There are no unlocked buffers that don't need to be written to the disk. */
518 ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
522 /* Now fill in the header. */
527 FixupBucket(pp
); /* move to the right hash bucket */
533 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
536 DRelease(char *ap
, int flag
)
543 index
= (int)(ap
- (char *)BufferData
) >> UBIK_LOGPAGESIZE
;
544 bp
= &(Buffers
[index
]);
552 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
555 * \note Note interaction with DSync(): you call this thing first,
556 * writing the buffers to the disk. Then you call DSync() to sync all the
557 * files that were written, and to clear the dirty bits. You should
558 * always call DFlush/DSync as a pair.
561 DFlush(struct ubik_trans
*atrans
)
566 struct ubik_dbase
*adbase
= atrans
->dbase
;
569 for (i
= 0; i
< nbuffers
; i
++, tb
++) {
571 code
= tb
->page
* UBIK_PAGESIZE
; /* offset within file */
573 (*adbase
->write
) (adbase
, tb
->file
, tb
->data
, code
,
575 if (code
!= UBIK_PAGESIZE
)
583 * \brief Flush all modified buffers.
586 DAbort(struct ubik_trans
*atrans
)
592 for (i
= 0; i
< nbuffers
; i
++, tb
++) {
603 * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
604 * can appear if a read transaction reads a page that is dirty, then that
605 * dirty page is synced. The read transaction will skip over the dirty page,
606 * and create a new buffer, and when the dirty page is synced, it will be
607 * identical (except for contents) to the read-transaction buffer.
610 DedupBuffer(struct buffer
*abuf
)
613 for (tb
= phTable
[pHash(abuf
->page
)]; tb
; tb
= tb
->hashNext
) {
614 if (tb
->page
== abuf
->page
&& tb
!= abuf
&& tb
->file
== abuf
->file
615 && tb
->dbase
== abuf
->dbase
) {
624 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
627 DSync(struct ubik_trans
*atrans
)
634 struct ubik_dbase
*adbase
= atrans
->dbase
;
639 for (i
= 0, tb
= Buffers
; i
< nbuffers
; i
++, tb
++) {
640 if (tb
->dirty
== 1) {
643 if (file
!= BADFID
&& tb
->file
== file
) {
651 /* otherwise we have a file to sync */
652 code
= (*adbase
->sync
) (adbase
, file
);
660 * \brief Same as DRead(), only do not even try to read the page.
663 DNew(struct ubik_trans
*atrans
, afs_int32 fid
, int page
)
666 struct ubik_dbase
*dbase
= atrans
->dbase
;
668 if ((tb
= newslot(dbase
, fid
, page
)) == 0)
671 memset(tb
->data
, 0, UBIK_PAGESIZE
);
676 * \brief Read data from database.
679 udisk_read(struct ubik_trans
*atrans
, afs_int32 afile
, void *abuffer
,
680 afs_int32 apos
, afs_int32 alen
)
683 afs_int32 offset
, len
, totalLen
;
685 if (atrans
->flags
& TRDONE
)
689 bp
= DRead(atrans
, afile
, apos
>> UBIK_LOGPAGESIZE
);
692 /* otherwise, min of remaining bytes and end of buffer to user mode */
693 offset
= apos
& (UBIK_PAGESIZE
- 1);
694 len
= UBIK_PAGESIZE
- offset
;
697 memcpy(abuffer
, bp
+ offset
, len
);
698 abuffer
= (char *)abuffer
+ len
;
708 * \brief Truncate file.
711 udisk_truncate(struct ubik_trans
*atrans
, afs_int32 afile
, afs_int32 alength
)
714 struct ubik_trunc
*tt
;
716 if (atrans
->flags
& TRDONE
)
718 if (atrans
->type
!= UBIK_WRITETRANS
)
721 /* write a truncate log record */
722 code
= udisk_LogTruncate(atrans
->dbase
, afile
, alength
);
724 /* don't truncate until commit time */
725 tt
= FindTrunc(atrans
, afile
);
727 /* this file not truncated yet */
729 tt
->next
= atrans
->activeTruncs
;
730 atrans
->activeTruncs
= tt
;
732 tt
->length
= alength
;
734 /* already truncated to a certain length */
735 if (tt
->length
> alength
)
736 tt
->length
= alength
;
742 * \brief Write data to database, using logs.
745 udisk_write(struct ubik_trans
*atrans
, afs_int32 afile
, void *abuffer
,
746 afs_int32 apos
, afs_int32 alen
)
749 afs_int32 offset
, len
, totalLen
;
750 struct ubik_trunc
*tt
;
753 if (atrans
->flags
& TRDONE
)
755 if (atrans
->type
!= UBIK_WRITETRANS
)
758 /* first write the data to the log */
759 code
= udisk_LogWriteData(atrans
->dbase
, afile
, abuffer
, apos
, alen
);
763 /* expand any truncations of this file */
764 tt
= FindTrunc(atrans
, afile
);
766 if (tt
->length
< apos
+ alen
) {
767 tt
->length
= apos
+ alen
;
774 bp
= DRead(atrans
, afile
, apos
>> UBIK_LOGPAGESIZE
);
776 bp
= DNew(atrans
, afile
, apos
>> UBIK_LOGPAGESIZE
);
780 /* otherwise, min of remaining bytes and end of buffer to user mode */
781 offset
= apos
& (UBIK_PAGESIZE
- 1);
782 len
= UBIK_PAGESIZE
- offset
;
785 memcpy(bp
+ offset
, abuffer
, len
);
786 abuffer
= (char *)abuffer
+ len
;
790 DRelease(bp
, 1); /* buffer modified */
796 * \brief Begin a new local transaction.
799 udisk_begin(struct ubik_dbase
*adbase
, int atype
, struct ubik_trans
**atrans
)
802 struct ubik_trans
*tt
;
805 if (atype
== UBIK_WRITETRANS
) {
806 if (adbase
->flags
& DBWRITING
)
808 code
= udisk_LogOpcode(adbase
, LOGNEW
, 0);
812 tt
= calloc(1, sizeof(struct ubik_trans
));
814 tt
->next
= adbase
->activeTrans
;
815 adbase
->activeTrans
= tt
;
817 if (atype
== UBIK_READTRANS
)
819 else if (atype
== UBIK_WRITETRANS
) {
821 adbase
->flags
|= DBWRITING
;
829 * \brief Commit transaction.
832 udisk_commit(struct ubik_trans
*atrans
)
834 struct ubik_dbase
*dbase
;
836 struct ubik_version oldversion
, newversion
;
837 afs_int32 now
= FT_ApproxTime();
839 if (atrans
->flags
& TRDONE
)
842 if (atrans
->type
== UBIK_WRITETRANS
) {
843 dbase
= atrans
->dbase
;
845 /* On the first write to the database. We update the versions */
846 if (ubeacon_AmSyncSite() && !(urecovery_state
& UBIK_RECLABELDB
)) {
848 if (version_globals
.ubik_epochTime
< UBIK_MILESTONE
849 || version_globals
.ubik_epochTime
> now
) {
851 ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
852 version_globals
.ubik_epochTime
, UBIK_MILESTONE
, now
));
853 panic("Writing Ubik DB label\n");
855 oldversion
= dbase
->version
;
856 newversion
.epoch
= version_globals
.ubik_epochTime
;
857 newversion
.counter
= 1;
859 code
= (*dbase
->setlabel
) (dbase
, 0, &newversion
);
865 dbase
->version
= newversion
;
868 urecovery_state
|= UBIK_RECLABELDB
;
870 /* Ignore the error here. If the call fails, the site is
871 * marked down and when we detect it is up again, we will
872 * send the entire database to it.
874 ContactQuorum_DISK_SetVersion( atrans
, 1 /*CStampVersion */ ,
875 &oldversion
, &newversion
);
879 dbase
->version
.counter
++; /* bump commit count */
880 #ifdef AFS_PTHREAD_ENV
881 opr_cv_broadcast(&dbase
->version_cond
);
883 LWP_NoYieldSignal(&dbase
->version
);
885 code
= udisk_LogEnd(dbase
, &dbase
->version
);
887 dbase
->version
.counter
--;
893 /* If we fail anytime after this, then panic and let the
894 * recovery replay the log.
896 code
= DFlush(atrans
); /* write dirty pages to respective files */
898 panic("Writing Ubik DB modifications\n");
899 code
= DSync(atrans
); /* sync the files and mark pages not dirty */
901 panic("Synchronizing Ubik DB modifications\n");
903 code
= DoTruncs(atrans
); /* Perform requested truncations */
905 panic("Truncating Ubik DB\n");
907 /* label the committed dbase */
908 code
= (*dbase
->setlabel
) (dbase
, 0, &dbase
->version
);
910 panic("Truncating Ubik DB\n");
912 code
= (*dbase
->truncate
) (dbase
, LOGFILE
, 0); /* discard log (optional) */
914 panic("Truncating Ubik logfile\n");
918 /* When the transaction is marked done, it also means the logfile
919 * has been truncated.
921 atrans
->flags
|= TRDONE
;
926 * \brief Abort transaction.
929 udisk_abort(struct ubik_trans
*atrans
)
931 struct ubik_dbase
*dbase
;
934 if (atrans
->flags
& TRDONE
)
937 /* Check if we are the write trans before logging abort, lest we
938 * abort a good write trans in progress.
939 * We don't really care if the LOGABORT gets to the log because we
940 * truncate the log next. If the truncate fails, we panic; for
941 * otherwise, the log entries remain. On restart, replay of the log
942 * will do nothing because the abort is there or no LogEnd opcode.
944 dbase
= atrans
->dbase
;
945 if (atrans
->type
== UBIK_WRITETRANS
&& dbase
->flags
& DBWRITING
) {
946 udisk_LogOpcode(dbase
, LOGABORT
, 1);
947 code
= (*dbase
->truncate
) (dbase
, LOGFILE
, 0);
949 panic("Truncating Ubik logfile during an abort\n");
950 DAbort(atrans
); /* remove all dirty pages */
953 /* When the transaction is marked done, it also means the logfile
954 * has been truncated.
956 atrans
->flags
|= (TRABORT
| TRDONE
);
961 * \brief Destroy a transaction after it has been committed or aborted.
963 * If it hasn't committed before you call this routine, we'll abort the
964 * transaction for you.
967 udisk_end(struct ubik_trans
*atrans
)
969 struct ubik_dbase
*dbase
;
971 if (!(atrans
->flags
& TRDONE
))
973 dbase
= atrans
->dbase
;
975 ulock_relLock(atrans
);
978 /* check if we are the write trans before unsetting the DBWRITING bit, else
979 * we could be unsetting someone else's bit.
981 if (atrans
->type
== UBIK_WRITETRANS
&& dbase
->flags
& DBWRITING
) {
983 dbase
->flags
&= ~DBWRITING
;
988 if (atrans
->iovec_info
.iovec_wrt_val
)
989 free(atrans
->iovec_info
.iovec_wrt_val
);
990 if (atrans
->iovec_data
.iovec_buf_val
)
991 free(atrans
->iovec_data
.iovec_buf_val
);
994 /* Wakeup any writers waiting in BeginTrans() */
995 #ifdef AFS_PTHREAD_ENV
996 opr_cv_broadcast(&dbase
->flags_cond
);
998 LWP_NoYieldSignal(&dbase
->flags
);