Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / ubik / disk.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14 #include <afs/opr.h>
15
16 #ifdef AFS_PTHREAD_ENV
17 # include <opr/lock.h>
18 #else
19 # include <opr/lockstub.h>
20 #endif
21 #include <afs/afsutil.h>
22
23 #define UBIK_INTERNALS
24 #include "ubik.h"
25 #include "ubik_int.h"
26
27 #define PHSIZE 128
28 static struct buffer {
29 struct ubik_dbase *dbase; /*!< dbase within which the buffer resides */
30 afs_int32 file; /*!< Unique cache key */
31 afs_int32 page; /*!< page number */
32 struct buffer *lru_next;
33 struct buffer *lru_prev;
34 struct buffer *hashNext; /*!< next dude in hash table */
35 char *data; /*!< ptr to the data */
36 char lockers; /*!< usage ref count */
37 char dirty; /*!< is buffer modified */
38 char hashIndex; /*!< back ptr to hash table */
39 } *Buffers;
40
41 #define pHash(page) ((page) & (PHSIZE-1))
42
43 afs_int32 ubik_nBuffers = NBUFFERS;
44 static struct buffer *phTable[PHSIZE]; /*!< page hash table */
45 static struct buffer *LruBuffer;
46 static int nbuffers;
47 static int calls = 0, ios = 0, lastb = 0;
48 static char *BufferData;
49 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
50 afs_int32 apage);
51 #define BADFID 0xffffffff
52
53 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
54
55 static struct ubik_trunc *freeTruncList = 0;
56
57 /*!
58 * \brief Remove a transaction from the database's active transaction list. Don't free it.
59 */
60 static int
61 unthread(struct ubik_trans *atrans)
62 {
63 struct ubik_trans **lt, *tt;
64 lt = &atrans->dbase->activeTrans;
65 for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
66 if (tt == atrans) {
67 /* found it */
68 *lt = tt->next;
69 return 0;
70 }
71 }
72 return 2; /* no entry */
73 }
74
75 /*!
76 * \brief some debugging assistance
77 */
78 void
79 udisk_Debug(struct ubik_debug *aparm)
80 {
81 struct buffer *tb;
82 int i;
83
84 memcpy(&aparm->localVersion, &ubik_dbase->version,
85 sizeof(struct ubik_version));
86 aparm->lockedPages = 0;
87 aparm->writeLockedPages = 0;
88 tb = Buffers;
89 for (i = 0; i < nbuffers; i++, tb++) {
90 if (tb->lockers) {
91 aparm->lockedPages++;
92 if (tb->dirty)
93 aparm->writeLockedPages++;
94 }
95 }
96 }
97
98 /*!
99 * \brief Write an opcode to the log.
100 *
101 * log format is defined here, and implicitly in recovery.c
102 *
103 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
104 * are in logged in network standard byte order, in case we want to move logs
105 * from machine-to-machine someday.
106 *
107 * Begin transaction: opcode \n
108 * Commit transaction: opcode, version (8 bytes) \n
109 * Truncate file: opcode, file number, length \n
110 * Abort transaction: opcode \n
111 * Write data: opcode, file, position, length, <length> data bytes \n
112 */
113 static int
114 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
115 {
116 afs_int32 code;
117
118 /* setup data and do write */
119 aopcode = htonl(aopcode);
120 code = (*adbase->buffered_append)(adbase, LOGFILE, &aopcode, sizeof(afs_int32));
121 if (code != sizeof(afs_int32))
122 return UIOERROR;
123
124 /* optionally sync data */
125 if (async)
126 code = (*adbase->sync) (adbase, LOGFILE);
127 else
128 code = 0;
129 return code;
130 }
131
132 /*!
133 * \brief Log a commit, never syncing.
134 */
135 static int
136 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
137 {
138 afs_int32 code;
139 afs_int32 data[3];
140
141 /* setup data */
142 data[0] = htonl(LOGEND);
143 data[1] = htonl(aversion->epoch);
144 data[2] = htonl(aversion->counter);
145
146 /* do write */
147 code =
148 (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
149 if (code != 3 * sizeof(afs_int32))
150 return UIOERROR;
151
152 /* finally sync the log */
153 code = (*adbase->sync) (adbase, LOGFILE);
154 return code;
155 }
156
157 /*!
158 * \brief Log a truncate operation, never syncing.
159 */
160 static int
161 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
162 afs_int32 alength)
163 {
164 afs_int32 code;
165 afs_int32 data[3];
166
167 /* setup data */
168 data[0] = htonl(LOGTRUNCATE);
169 data[1] = htonl(afile);
170 data[2] = htonl(alength);
171
172 /* do write */
173 code =
174 (*adbase->buffered_append)(adbase, LOGFILE, data, 3 * sizeof(afs_int32));
175 if (code != 3 * sizeof(afs_int32))
176 return UIOERROR;
177 return 0;
178 }
179
180 /*!
181 * \brief Write some data to the log, never syncing.
182 */
183 static int
184 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
185 afs_int32 apos, afs_int32 alen)
186 {
187 afs_int32 code;
188 afs_int32 data[4];
189
190 /* setup header */
191 data[0] = htonl(LOGDATA);
192 data[1] = htonl(afile);
193 data[2] = htonl(apos);
194 data[3] = htonl(alen);
195
196 /* write header */
197 code =
198 (*adbase->buffered_append)(adbase, LOGFILE, data, 4 * sizeof(afs_int32));
199 if (code != 4 * sizeof(afs_int32))
200 return UIOERROR;
201
202 /* write data */
203 code = (*adbase->buffered_append)(adbase, LOGFILE, abuffer, alen);
204 if (code != alen)
205 return UIOERROR;
206 return 0;
207 }
208
209 int
210 udisk_Init(int abuffers)
211 {
212 /* Initialize the venus buffer system. */
213 int i;
214 struct buffer *tb;
215 Buffers = calloc(abuffers, sizeof(struct buffer));
216 BufferData = malloc(abuffers * UBIK_PAGESIZE);
217 nbuffers = abuffers;
218 for (i = 0; i < PHSIZE; i++)
219 phTable[i] = 0;
220 for (i = 0; i < abuffers; i++) {
221 /* Fill in each buffer with an empty indication. */
222 tb = &Buffers[i];
223 tb->lru_next = &(Buffers[i + 1]);
224 tb->lru_prev = &(Buffers[i - 1]);
225 tb->data = &BufferData[UBIK_PAGESIZE * i];
226 tb->file = BADFID;
227 }
228 Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
229 Buffers[abuffers - 1].lru_next = &(Buffers[0]);
230 LruBuffer = &(Buffers[0]);
231 return 0;
232 }
233
234 /*!
235 * \brief Take a buffer and mark it as the least recently used buffer.
236 */
237 static void
238 Dlru(struct buffer *abuf)
239 {
240 if (LruBuffer == abuf)
241 return;
242
243 /* Unthread from where it is in the list */
244 abuf->lru_next->lru_prev = abuf->lru_prev;
245 abuf->lru_prev->lru_next = abuf->lru_next;
246
247 /* Thread onto beginning of LRU list */
248 abuf->lru_next = LruBuffer;
249 abuf->lru_prev = LruBuffer->lru_prev;
250
251 LruBuffer->lru_prev->lru_next = abuf;
252 LruBuffer->lru_prev = abuf;
253 LruBuffer = abuf;
254 }
255
256 /*!
257 * \brief Take a buffer and mark it as the most recently used buffer.
258 */
259 static void
260 Dmru(struct buffer *abuf)
261 {
262 if (LruBuffer == abuf) {
263 LruBuffer = LruBuffer->lru_next;
264 return;
265 }
266
267 /* Unthread from where it is in the list */
268 abuf->lru_next->lru_prev = abuf->lru_prev;
269 abuf->lru_prev->lru_next = abuf->lru_next;
270
271 /* Thread onto end of LRU list - making it the MRU buffer */
272 abuf->lru_next = LruBuffer;
273 abuf->lru_prev = LruBuffer->lru_prev;
274 LruBuffer->lru_prev->lru_next = abuf;
275 LruBuffer->lru_prev = abuf;
276 }
277
278 static_inline int
279 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
280 struct ubik_trans *atrans)
281 {
282 if (buf->page != page) {
283 return 0;
284 }
285 if (buf->file != fid) {
286 return 0;
287 }
288 if (atrans->type == UBIK_READTRANS && buf->dirty) {
289 /* if 'buf' is dirty, it has uncommitted changes; we do not want to
290 * see uncommitted changes if we are a read transaction, so skip over
291 * it. */
292 return 0;
293 }
294 if (buf->dbase != atrans->dbase) {
295 return 0;
296 }
297 return 1;
298 }
299
300 /*!
301 * \brief Get a pointer to a particular buffer.
302 */
303 static char *
304 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
305 {
306 /* Read a page from the disk. */
307 struct buffer *tb, *lastbuffer, *found_tb = NULL;
308 afs_int32 code;
309 struct ubik_dbase *dbase = atrans->dbase;
310
311 calls++;
312 lastbuffer = LruBuffer->lru_prev;
313
314 /* Skip for write transactions for a clean page - this may not be the right page to use */
315 if (MatchBuffer(lastbuffer, page, fid, atrans)
316 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
317 tb = lastbuffer;
318 tb->lockers++;
319 lastb++;
320 return tb->data;
321 }
322 for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
323 if (MatchBuffer(tb, page, fid, atrans)) {
324 if (tb->dirty || atrans->type == UBIK_READTRANS) {
325 found_tb = tb;
326 break;
327 }
328 /* Remember this clean page - we might use it */
329 found_tb = tb;
330 }
331 }
332 /* For a write transaction, use a matching clean page if no dirty one was found */
333 if (found_tb) {
334 Dmru(found_tb);
335 found_tb->lockers++;
336 return found_tb->data;
337 }
338
339 /* can't find it */
340 tb = newslot(dbase, fid, page);
341 if (!tb)
342 return 0;
343 memset(tb->data, 0, UBIK_PAGESIZE);
344
345 tb->lockers++;
346 code =
347 (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
348 UBIK_PAGESIZE);
349 if (code < 0) {
350 tb->file = BADFID;
351 Dlru(tb);
352 tb->lockers--;
353 ViceLog(0, ("Ubik: Error reading database file: errno=%d\n", errno));
354 return 0;
355 }
356 ios++;
357
358 /* Note that findslot sets the page field in the buffer equal to
359 * what it is searching for.
360 */
361 return tb->data;
362 }
363
364 /*!
365 * \brief Zap truncated pages.
366 */
367 static int
368 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
369 {
370 afs_int32 maxPage;
371 struct buffer *tb;
372 int i;
373 struct ubik_dbase *dbase = atrans->dbase;
374
375 maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
376 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
377 if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
378 tb->file = BADFID;
379 Dlru(tb);
380 }
381 }
382 return 0;
383 }
384
385 /*!
386 * \brief Allocate a truncation entry.
387 *
388 * We allocate special entries representing truncations, rather than
389 * performing them immediately, so that we can abort a transaction easily by simply purging
390 * the in-core memory buffers and discarding these truncation entries.
391 */
392 static struct ubik_trunc *
393 GetTrunc(void)
394 {
395 struct ubik_trunc *tt;
396 if (!freeTruncList) {
397 freeTruncList = malloc(sizeof(struct ubik_trunc));
398 freeTruncList->next = (struct ubik_trunc *)0;
399 }
400 tt = freeTruncList;
401 freeTruncList = tt->next;
402 return tt;
403 }
404
405 /*!
406 * \brief Free a truncation entry.
407 */
408 static int
409 PutTrunc(struct ubik_trunc *at)
410 {
411 at->next = freeTruncList;
412 freeTruncList = at;
413 return 0;
414 }
415
416 /*!
417 * \brief Find a truncation entry for a file, if any.
418 */
419 static struct ubik_trunc *
420 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
421 {
422 struct ubik_trunc *tt;
423 for (tt = atrans->activeTruncs; tt; tt = tt->next) {
424 if (tt->file == afile)
425 return tt;
426 }
427 return (struct ubik_trunc *)0;
428 }
429
430 /*!
431 * \brief Do truncates associated with \p atrans, and free them.
432 */
433 static int
434 DoTruncs(struct ubik_trans *atrans)
435 {
436 struct ubik_trunc *tt, *nt;
437 int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
438 afs_int32 rcode = 0, code;
439
440 tproc = atrans->dbase->truncate;
441 for (tt = atrans->activeTruncs; tt; tt = nt) {
442 nt = tt->next;
443 DTrunc(atrans, tt->file, tt->length); /* zap pages from buffer cache */
444 code = (*tproc) (atrans->dbase, tt->file, tt->length);
445 if (code)
446 rcode = code;
447 PutTrunc(tt);
448 }
449 /* don't unthread, because we do the entire list's worth here */
450 atrans->activeTruncs = (struct ubik_trunc *)0;
451 return (rcode);
452 }
453
454 /*!
455 * \brief Mark an \p fid as invalid.
456 */
457 int
458 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
459 {
460 struct buffer *tb;
461 int i;
462
463 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
464 if (tb->file == afid) {
465 tb->file = BADFID;
466 Dlru(tb);
467 }
468 }
469 return 0;
470 }
471
472 /*!
473 * \brief Move this page into the correct hash bucket.
474 */
475 static int
476 FixupBucket(struct buffer *ap)
477 {
478 struct buffer **lp, *tp;
479 int i;
480 /* first try to get it out of its current hash bucket, in which it might not be */
481 i = ap->hashIndex;
482 lp = &phTable[i];
483 for (tp = *lp; tp; tp = tp->hashNext) {
484 if (tp == ap) {
485 *lp = tp->hashNext;
486 break;
487 }
488 lp = &tp->hashNext;
489 }
490 /* now figure the new hash bucket */
491 i = pHash(ap->page);
492 ap->hashIndex = i; /* remember where we are for deletion */
493 ap->hashNext = phTable[i]; /* add us to the list */
494 phTable[i] = ap;
495 return 0;
496 }
497
498 /*!
499 * \brief Create a new slot for a particular dbase page.
500 */
501 static struct buffer *
502 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
503 {
504 /* Find a usable buffer slot */
505 afs_int32 i;
506 struct buffer *pp, *tp;
507
508 pp = 0; /* last pure */
509 for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
510 if (!tp->lockers && !tp->dirty) {
511 pp = tp;
512 break;
513 }
514 }
515
516 if (pp == 0) {
517 /* There are no unlocked buffers that don't need to be written to the disk. */
518 ViceLog(0, ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n"));
519 return NULL;
520 }
521
522 /* Now fill in the header. */
523 pp->dbase = adbase;
524 pp->file = afid;
525 pp->page = apage;
526
527 FixupBucket(pp); /* move to the right hash bucket */
528 Dmru(pp);
529 return pp;
530 }
531
532 /*!
533 * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
534 */
535 static void
536 DRelease(char *ap, int flag)
537 {
538 int index;
539 struct buffer *bp;
540
541 if (!ap)
542 return;
543 index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
544 bp = &(Buffers[index]);
545 bp->lockers--;
546 if (flag)
547 bp->dirty = 1;
548 return;
549 }
550
551 /*!
552 * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
553 * by DSync()).
554 *
555 * \note Note interaction with DSync(): you call this thing first,
556 * writing the buffers to the disk. Then you call DSync() to sync all the
557 * files that were written, and to clear the dirty bits. You should
558 * always call DFlush/DSync as a pair.
559 */
560 static int
561 DFlush(struct ubik_trans *atrans)
562 {
563 int i;
564 afs_int32 code;
565 struct buffer *tb;
566 struct ubik_dbase *adbase = atrans->dbase;
567
568 tb = Buffers;
569 for (i = 0; i < nbuffers; i++, tb++) {
570 if (tb->dirty) {
571 code = tb->page * UBIK_PAGESIZE; /* offset within file */
572 code =
573 (*adbase->write) (adbase, tb->file, tb->data, code,
574 UBIK_PAGESIZE);
575 if (code != UBIK_PAGESIZE)
576 return UIOERROR;
577 }
578 }
579 return 0;
580 }
581
582 /*!
583 * \brief Flush all modified buffers.
584 */
585 static int
586 DAbort(struct ubik_trans *atrans)
587 {
588 int i;
589 struct buffer *tb;
590
591 tb = Buffers;
592 for (i = 0; i < nbuffers; i++, tb++) {
593 if (tb->dirty) {
594 tb->dirty = 0;
595 tb->file = BADFID;
596 Dlru(tb);
597 }
598 }
599 return 0;
600 }
601
602 /**
603 * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
604 * can appear if a read transaction reads a page that is dirty, then that
605 * dirty page is synced. The read transaction will skip over the dirty page,
606 * and create a new buffer, and when the dirty page is synced, it will be
607 * identical (except for contents) to the read-transaction buffer.
608 */
609 static void
610 DedupBuffer(struct buffer *abuf)
611 {
612 struct buffer *tb;
613 for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
614 if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
615 && tb->dbase == abuf->dbase) {
616
617 tb->file = BADFID;
618 Dlru(tb);
619 }
620 }
621 }
622
623 /*!
624 * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
625 */
626 static int
627 DSync(struct ubik_trans *atrans)
628 {
629 int i;
630 afs_int32 code;
631 struct buffer *tb;
632 afs_int32 file;
633 afs_int32 rCode;
634 struct ubik_dbase *adbase = atrans->dbase;
635
636 rCode = 0;
637 while (1) {
638 file = BADFID;
639 for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
640 if (tb->dirty == 1) {
641 if (file == BADFID)
642 file = tb->file;
643 if (file != BADFID && tb->file == file) {
644 tb->dirty = 0;
645 DedupBuffer(tb);
646 }
647 }
648 }
649 if (file == BADFID)
650 break;
651 /* otherwise we have a file to sync */
652 code = (*adbase->sync) (adbase, file);
653 if (code)
654 rCode = code;
655 }
656 return rCode;
657 }
658
659 /*!
660 * \brief Same as DRead(), only do not even try to read the page.
661 */
662 static char *
663 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
664 {
665 struct buffer *tb;
666 struct ubik_dbase *dbase = atrans->dbase;
667
668 if ((tb = newslot(dbase, fid, page)) == 0)
669 return NULL;
670 tb->lockers++;
671 memset(tb->data, 0, UBIK_PAGESIZE);
672 return tb->data;
673 }
674
675 /*!
676 * \brief Read data from database.
677 */
678 int
679 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
680 afs_int32 apos, afs_int32 alen)
681 {
682 char *bp;
683 afs_int32 offset, len, totalLen;
684
685 if (atrans->flags & TRDONE)
686 return UDONE;
687 totalLen = 0;
688 while (alen > 0) {
689 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
690 if (!bp)
691 return UEOF;
692 /* otherwise, min of remaining bytes and end of buffer to user mode */
693 offset = apos & (UBIK_PAGESIZE - 1);
694 len = UBIK_PAGESIZE - offset;
695 if (len > alen)
696 len = alen;
697 memcpy(abuffer, bp + offset, len);
698 abuffer = (char *)abuffer + len;
699 apos += len;
700 alen -= len;
701 totalLen += len;
702 DRelease(bp, 0);
703 }
704 return 0;
705 }
706
707 /*!
708 * \brief Truncate file.
709 */
710 int
711 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
712 {
713 afs_int32 code;
714 struct ubik_trunc *tt;
715
716 if (atrans->flags & TRDONE)
717 return UDONE;
718 if (atrans->type != UBIK_WRITETRANS)
719 return UBADTYPE;
720
721 /* write a truncate log record */
722 code = udisk_LogTruncate(atrans->dbase, afile, alength);
723
724 /* don't truncate until commit time */
725 tt = FindTrunc(atrans, afile);
726 if (!tt) {
727 /* this file not truncated yet */
728 tt = GetTrunc();
729 tt->next = atrans->activeTruncs;
730 atrans->activeTruncs = tt;
731 tt->file = afile;
732 tt->length = alength;
733 } else {
734 /* already truncated to a certain length */
735 if (tt->length > alength)
736 tt->length = alength;
737 }
738 return code;
739 }
740
741 /*!
742 * \brief Write data to database, using logs.
743 */
744 int
745 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
746 afs_int32 apos, afs_int32 alen)
747 {
748 char *bp;
749 afs_int32 offset, len, totalLen;
750 struct ubik_trunc *tt;
751 afs_int32 code;
752
753 if (atrans->flags & TRDONE)
754 return UDONE;
755 if (atrans->type != UBIK_WRITETRANS)
756 return UBADTYPE;
757
758 /* first write the data to the log */
759 code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
760 if (code)
761 return code;
762
763 /* expand any truncations of this file */
764 tt = FindTrunc(atrans, afile);
765 if (tt) {
766 if (tt->length < apos + alen) {
767 tt->length = apos + alen;
768 }
769 }
770
771 /* now update vm */
772 totalLen = 0;
773 while (alen > 0) {
774 bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
775 if (!bp) {
776 bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
777 if (!bp)
778 return UIOERROR;
779 }
780 /* otherwise, min of remaining bytes and end of buffer to user mode */
781 offset = apos & (UBIK_PAGESIZE - 1);
782 len = UBIK_PAGESIZE - offset;
783 if (len > alen)
784 len = alen;
785 memcpy(bp + offset, abuffer, len);
786 abuffer = (char *)abuffer + len;
787 apos += len;
788 alen -= len;
789 totalLen += len;
790 DRelease(bp, 1); /* buffer modified */
791 }
792 return 0;
793 }
794
795 /*!
796 * \brief Begin a new local transaction.
797 */
798 int
799 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
800 {
801 afs_int32 code;
802 struct ubik_trans *tt;
803
804 *atrans = NULL;
805 if (atype == UBIK_WRITETRANS) {
806 if (adbase->flags & DBWRITING)
807 return USYNC;
808 code = udisk_LogOpcode(adbase, LOGNEW, 0);
809 if (code)
810 return code;
811 }
812 tt = calloc(1, sizeof(struct ubik_trans));
813 tt->dbase = adbase;
814 tt->next = adbase->activeTrans;
815 adbase->activeTrans = tt;
816 tt->type = atype;
817 if (atype == UBIK_READTRANS)
818 adbase->readers++;
819 else if (atype == UBIK_WRITETRANS) {
820 UBIK_VERSION_LOCK;
821 adbase->flags |= DBWRITING;
822 UBIK_VERSION_UNLOCK;
823 }
824 *atrans = tt;
825 return 0;
826 }
827
828 /*!
829 * \brief Commit transaction.
830 */
831 int
832 udisk_commit(struct ubik_trans *atrans)
833 {
834 struct ubik_dbase *dbase;
835 afs_int32 code = 0;
836 struct ubik_version oldversion, newversion;
837 afs_int32 now = FT_ApproxTime();
838
839 if (atrans->flags & TRDONE)
840 return (UTWOENDS);
841
842 if (atrans->type == UBIK_WRITETRANS) {
843 dbase = atrans->dbase;
844
845 /* On the first write to the database. We update the versions */
846 if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
847 UBIK_VERSION_LOCK;
848 if (version_globals.ubik_epochTime < UBIK_MILESTONE
849 || version_globals.ubik_epochTime > now) {
850 ViceLog(0,
851 ("Ubik: New database label %d is out of the valid range (%d - %d)\n",
852 version_globals.ubik_epochTime, UBIK_MILESTONE, now));
853 panic("Writing Ubik DB label\n");
854 }
855 oldversion = dbase->version;
856 newversion.epoch = version_globals.ubik_epochTime;
857 newversion.counter = 1;
858
859 code = (*dbase->setlabel) (dbase, 0, &newversion);
860 if (code) {
861 UBIK_VERSION_UNLOCK;
862 return code;
863 }
864
865 dbase->version = newversion;
866 UBIK_VERSION_UNLOCK;
867
868 urecovery_state |= UBIK_RECLABELDB;
869
870 /* Ignore the error here. If the call fails, the site is
871 * marked down and when we detect it is up again, we will
872 * send the entire database to it.
873 */
874 ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
875 &oldversion, &newversion);
876 }
877
878 UBIK_VERSION_LOCK;
879 dbase->version.counter++; /* bump commit count */
880 #ifdef AFS_PTHREAD_ENV
881 opr_cv_broadcast(&dbase->version_cond);
882 #else
883 LWP_NoYieldSignal(&dbase->version);
884 #endif
885 code = udisk_LogEnd(dbase, &dbase->version);
886 if (code) {
887 dbase->version.counter--;
888 UBIK_VERSION_UNLOCK;
889 return code;
890 }
891 UBIK_VERSION_UNLOCK;
892
893 /* If we fail anytime after this, then panic and let the
894 * recovery replay the log.
895 */
896 code = DFlush(atrans); /* write dirty pages to respective files */
897 if (code)
898 panic("Writing Ubik DB modifications\n");
899 code = DSync(atrans); /* sync the files and mark pages not dirty */
900 if (code)
901 panic("Synchronizing Ubik DB modifications\n");
902
903 code = DoTruncs(atrans); /* Perform requested truncations */
904 if (code)
905 panic("Truncating Ubik DB\n");
906
907 /* label the committed dbase */
908 code = (*dbase->setlabel) (dbase, 0, &dbase->version);
909 if (code)
910 panic("Truncating Ubik DB\n");
911
912 code = (*dbase->truncate) (dbase, LOGFILE, 0); /* discard log (optional) */
913 if (code)
914 panic("Truncating Ubik logfile\n");
915
916 }
917
918 /* When the transaction is marked done, it also means the logfile
919 * has been truncated.
920 */
921 atrans->flags |= TRDONE;
922 return code;
923 }
924
925 /*!
926 * \brief Abort transaction.
927 */
928 int
929 udisk_abort(struct ubik_trans *atrans)
930 {
931 struct ubik_dbase *dbase;
932 afs_int32 code;
933
934 if (atrans->flags & TRDONE)
935 return UTWOENDS;
936
937 /* Check if we are the write trans before logging abort, lest we
938 * abort a good write trans in progress.
939 * We don't really care if the LOGABORT gets to the log because we
940 * truncate the log next. If the truncate fails, we panic; for
941 * otherwise, the log entries remain. On restart, replay of the log
942 * will do nothing because the abort is there or no LogEnd opcode.
943 */
944 dbase = atrans->dbase;
945 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
946 udisk_LogOpcode(dbase, LOGABORT, 1);
947 code = (*dbase->truncate) (dbase, LOGFILE, 0);
948 if (code)
949 panic("Truncating Ubik logfile during an abort\n");
950 DAbort(atrans); /* remove all dirty pages */
951 }
952
953 /* When the transaction is marked done, it also means the logfile
954 * has been truncated.
955 */
956 atrans->flags |= (TRABORT | TRDONE);
957 return 0;
958 }
959
960 /*!
961 * \brief Destroy a transaction after it has been committed or aborted.
962 *
963 * If it hasn't committed before you call this routine, we'll abort the
964 * transaction for you.
965 */
966 int
967 udisk_end(struct ubik_trans *atrans)
968 {
969 struct ubik_dbase *dbase;
970
971 if (!(atrans->flags & TRDONE))
972 udisk_abort(atrans);
973 dbase = atrans->dbase;
974
975 ulock_relLock(atrans);
976 unthread(atrans);
977
978 /* check if we are the write trans before unsetting the DBWRITING bit, else
979 * we could be unsetting someone else's bit.
980 */
981 if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
982 UBIK_VERSION_LOCK;
983 dbase->flags &= ~DBWRITING;
984 UBIK_VERSION_UNLOCK;
985 } else {
986 dbase->readers--;
987 }
988 if (atrans->iovec_info.iovec_wrt_val)
989 free(atrans->iovec_info.iovec_wrt_val);
990 if (atrans->iovec_data.iovec_buf_val)
991 free(atrans->iovec_data.iovec_buf_val);
992 free(atrans);
993
994 /* Wakeup any writers waiting in BeginTrans() */
995 #ifdef AFS_PTHREAD_ENV
996 opr_cv_broadcast(&dbase->flags_cond);
997 #else
998 LWP_NoYieldSignal(&dbase->flags);
999 #endif
1000 return 0;
1001 }