Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / rx / rx_rdwr.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #ifdef KERNEL
14 # ifndef UKERNEL
15 # ifdef RX_KERNEL_TRACE
16 # include "rx_kcommon.h"
17 # endif
18 # if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 # include "afs/sysincludes.h"
20 # else
21 # include "h/types.h"
22 # include "h/time.h"
23 # include "h/stat.h"
24 # if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
25 # include "h/systm.h"
26 # endif
27 # ifdef AFS_OSF_ENV
28 # include <net/net_globals.h>
29 # endif /* AFS_OSF_ENV */
30 # ifdef AFS_LINUX20_ENV
31 # include "h/socket.h"
32 # endif
33 # include "netinet/in.h"
34 # if defined(AFS_SGI_ENV)
35 # include "afs/sysincludes.h"
36 # endif
37 # endif
38 # include "afs/afs_args.h"
39 # if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
40 # include "h/systm.h"
41 # endif
42 # else /* !UKERNEL */
43 # include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
45
46 # ifdef RXDEBUG
47 # undef RXDEBUG /* turn off debugging */
48 # endif /* RXDEBUG */
49
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
54 #else /* KERNEL */
55 # include <roken.h>
56 # include <afs/opr.h>
57 #endif /* KERNEL */
58
59 #include "rx.h"
60 #include "rx_clock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
64 #include "rx_conn.h"
65 #include "rx_call.h"
66 #include "rx_packet.h"
67
68 #ifdef RX_LOCKS_DB
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID = RXDB_FILE_RX_RDWR;
71 #endif /* RX_LOCKS_DB */
72
73 /* Get the next packet in the receive queue
74 *
75 * Dispose of the call's currentPacket, and move the next packet in the
76 * receive queue into the currentPacket field. If the next packet isn't
77 * available, then currentPacket is left NULL.
78 *
79 * @param call
80 * The RX call to manipulate
81 * @returns
82 * 0 on success, an error code on failure
83 *
84 * @notes
85 * Must be called with the call locked. Unlocks the call if returning
86 * with an error.
87 */
88
89 static int
90 rxi_GetNextPacket(struct rx_call *call) {
91 struct rx_packet *rp;
92 int error;
93
94 if (call->app.currentPacket != NULL) {
95 #ifdef RX_TRACK_PACKETS
96 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
97 #endif
98 rxi_FreePacket(call->app.currentPacket);
99 call->app.currentPacket = NULL;
100 }
101
102 if (opr_queue_IsEmpty(&call->rq))
103 return 0;
104
105 /* Check that next packet available is next in sequence */
106 rp = opr_queue_First(&call->rq, struct rx_packet, entry);
107 if (rp->header.seq != call->rnext)
108 return 0;
109
110 opr_queue_Remove(&rp->entry);
111 #ifdef RX_TRACK_PACKETS
112 rp->flags &= ~RX_PKTFLAG_RQ;
113 #endif
114 #ifdef RXDEBUG_PACKET
115 call->rqc--;
116 #endif /* RXDEBUG_PACKET */
117
118 /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may
119 * reduce the length of the packet by up to conn->maxTrailerSize,
120 * to reflect the length of the data + the header. */
121 if ((error = RXS_CheckPacket(call->conn->securityObject, call, rp))) {
122 /* Used to merely shut down the call, but now we shut down the whole
123 * connection since this may indicate an attempt to hijack it */
124
125 MUTEX_EXIT(&call->lock);
126 rxi_ConnectionError(call->conn, error);
127 MUTEX_ENTER(&call->conn->conn_data_lock);
128 rp = rxi_SendConnectionAbort(call->conn, rp, 0, 0);
129 MUTEX_EXIT(&call->conn->conn_data_lock);
130 rxi_FreePacket(rp);
131
132 return error;
133 }
134
135 call->rnext++;
136 call->app.currentPacket = rp;
137 #ifdef RX_TRACK_PACKETS
138 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
139 #endif
140 call->app.curvec = 1; /* 0th vec is always header */
141
142 /* begin at the beginning [ more or less ], continue on until the end,
143 * then stop. */
144 call->app.curpos = (char *)call->app.currentPacket->wirevec[1].iov_base +
145 call->conn->securityHeaderSize;
146 call->app.curlen = call->app.currentPacket->wirevec[1].iov_len -
147 call->conn->securityHeaderSize;
148
149 call->app.nLeft = call->app.currentPacket->length;
150 call->app.bytesRcvd += call->app.currentPacket->length;
151
152 call->nHardAcks++;
153
154 return 0;
155 }
156
157 /* rxi_ReadProc -- internal version.
158 *
159 * LOCKS USED -- called at netpri
160 */
161 int
162 rxi_ReadProc(struct rx_call *call, char *buf,
163 int nbytes)
164 {
165 int requestCount;
166 int code;
167 unsigned int t;
168
169 /* XXXX took out clock_NewTime from here. Was it needed? */
170 requestCount = nbytes;
171
172 /* Free any packets from the last call to ReadvProc/WritevProc */
173 if (!opr_queue_IsEmpty(&call->app.iovq)) {
174 #ifdef RXDEBUG_PACKET
175 call->iovqc -=
176 #endif /* RXDEBUG_PACKET */
177 rxi_FreePackets(0, &call->app.iovq);
178 }
179
180 do {
181 if (call->app.nLeft == 0) {
182 /* Get next packet */
183 MUTEX_ENTER(&call->lock);
184 for (;;) {
185 if (call->error || (call->app.mode != RX_MODE_RECEIVING)) {
186 if (call->error) {
187 call->app.mode = RX_MODE_ERROR;
188 MUTEX_EXIT(&call->lock);
189 return 0;
190 }
191 if (call->app.mode == RX_MODE_SENDING) {
192 rxi_FlushWriteLocked(call);
193 continue;
194 }
195 }
196
197 code = rxi_GetNextPacket(call);
198 if (code)
199 return 0;
200
201 if (call->app.currentPacket) {
202 if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
203 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
204 rxi_CancelDelayedAckEvent(call);
205 rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
206 } else {
207 /* Delay to consolidate ack packets */
208 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
209 }
210 }
211 break;
212 }
213
214 /*
215 * If we reach this point either we have no packets in the
216 * receive queue or the next packet in the queue is not the
217 * one we are looking for. There is nothing else for us to
218 * do but wait for another packet to arrive.
219 */
220
221 /* Are there ever going to be any more packets? */
222 if (call->flags & RX_CALL_RECEIVE_DONE) {
223 MUTEX_EXIT(&call->lock);
224 return requestCount - nbytes;
225 }
226 /* Wait for in-sequence packet */
227 call->flags |= RX_CALL_READER_WAIT;
228 clock_NewTime();
229 call->startWait = clock_Sec();
230 while (call->flags & RX_CALL_READER_WAIT) {
231 #ifdef RX_ENABLE_LOCKS
232 CV_WAIT(&call->cv_rq, &call->lock);
233 #else
234 osi_rxSleep(&call->rq);
235 #endif
236 }
237
238 call->startWait = 0;
239 #ifdef RX_ENABLE_LOCKS
240 if (call->error) {
241 MUTEX_EXIT(&call->lock);
242 return 0;
243 }
244 #endif /* RX_ENABLE_LOCKS */
245 }
246 MUTEX_EXIT(&call->lock);
247 } else
248 /* osi_Assert(cp); */
249 /* MTUXXX this should be replaced by some error-recovery code before shipping */
250 /* yes, the following block is allowed to be the ELSE clause (or not) */
251 /* It's possible for call->app.nLeft to be smaller than any particular
252 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
253 * reflects the size of the buffer. We have to keep track of the
254 * number of bytes read in the length field of the packet struct. On
255 * the final portion of a received packet, it's almost certain that
256 * call->app.nLeft will be smaller than the final buffer. */
257 while (nbytes && call->app.currentPacket) {
258 t = MIN((int)call->app.curlen, nbytes);
259 t = MIN(t, (int)call->app.nLeft);
260 memcpy(buf, call->app.curpos, t);
261 buf += t;
262 nbytes -= t;
263 call->app.curpos += t;
264 call->app.curlen -= t;
265 call->app.nLeft -= t;
266
267 if (!call->app.nLeft) {
268 /* out of packet. Get another one. */
269 #ifdef RX_TRACK_PACKETS
270 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
271 #endif
272 rxi_FreePacket(call->app.currentPacket);
273 call->app.currentPacket = NULL;
274 } else if (!call->app.curlen) {
275 /* need to get another struct iov */
276 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
277 /* current packet is exhausted, get ready for another */
278 /* don't worry about curvec and stuff, they get set somewhere else */
279 #ifdef RX_TRACK_PACKETS
280 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
281 #endif
282 rxi_FreePacket(call->app.currentPacket);
283 call->app.currentPacket = NULL;
284 call->app.nLeft = 0;
285 } else {
286 call->app.curpos =
287 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
288 call->app.curlen =
289 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
290 }
291 }
292 }
293 if (!nbytes) {
294 /* user buffer is full, return */
295 return requestCount;
296 }
297
298 } while (nbytes);
299
300 return requestCount;
301 }
302
303 int
304 rx_ReadProc(struct rx_call *call, char *buf, int nbytes)
305 {
306 int bytes;
307 SPLVAR;
308
309 /* Free any packets from the last call to ReadvProc/WritevProc */
310 if (!opr_queue_IsEmpty(&call->app.iovq)) {
311 #ifdef RXDEBUG_PACKET
312 call->iovqc -=
313 #endif /* RXDEBUG_PACKET */
314 rxi_FreePackets(0, &call->app.iovq);
315 }
316
317 /*
318 * Most common case, all of the data is in the current iovec.
319 * We are relying on nLeft being zero unless the call is in receive mode.
320 */
321 if (!call->error && call->app.curlen > nbytes && call->app.nLeft > nbytes) {
322 memcpy(buf, call->app.curpos, nbytes);
323
324 call->app.curpos += nbytes;
325 call->app.curlen -= nbytes;
326 call->app.nLeft -= nbytes;
327
328 if (!call->app.nLeft && call->app.currentPacket != NULL) {
329 /* out of packet. Get another one. */
330 rxi_FreePacket(call->app.currentPacket);
331 call->app.currentPacket = NULL;
332 }
333 return nbytes;
334 }
335
336 NETPRI;
337 bytes = rxi_ReadProc(call, buf, nbytes);
338 USERPRI;
339 return bytes;
340 }
341
342 /* Optimization for unmarshalling 32 bit integers */
343 int
344 rx_ReadProc32(struct rx_call *call, afs_int32 * value)
345 {
346 int bytes;
347 SPLVAR;
348
349 /* Free any packets from the last call to ReadvProc/WritevProc */
350 if (!opr_queue_IsEmpty(&call->app.iovq)) {
351 #ifdef RXDEBUG_PACKET
352 call->iovqc -=
353 #endif /* RXDEBUG_PACKET */
354 rxi_FreePackets(0, &call->app.iovq);
355 }
356
357 /*
358 * Most common case, all of the data is in the current iovec.
359 * We are relying on nLeft being zero unless the call is in receive mode.
360 */
361 if (!call->error && call->app.curlen >= sizeof(afs_int32)
362 && call->app.nLeft >= sizeof(afs_int32)) {
363
364 memcpy((char *)value, call->app.curpos, sizeof(afs_int32));
365
366 call->app.curpos += sizeof(afs_int32);
367 call->app.curlen -= sizeof(afs_int32);
368 call->app.nLeft -= sizeof(afs_int32);
369
370 if (!call->app.nLeft && call->app.currentPacket != NULL) {
371 /* out of packet. Get another one. */
372 rxi_FreePacket(call->app.currentPacket);
373 call->app.currentPacket = NULL;
374 }
375 return sizeof(afs_int32);
376 }
377
378 NETPRI;
379 bytes = rxi_ReadProc(call, (char *)value, sizeof(afs_int32));
380 USERPRI;
381
382 return bytes;
383 }
384
385 /* rxi_FillReadVec
386 *
387 * Uses packets in the receive queue to fill in as much of the
388 * current iovec as possible. Does not block if it runs out
389 * of packets to complete the iovec. Return true if an ack packet
390 * was sent, otherwise return false */
391 int
392 rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
393 {
394 int didConsume = 0;
395 int didHardAck = 0;
396 int code;
397 unsigned int t;
398 struct iovec *call_iov;
399 struct iovec *cur_iov = NULL;
400
401 if (call->app.currentPacket) {
402 cur_iov = &call->app.currentPacket->wirevec[call->app.curvec];
403 }
404 call_iov = &call->iov[call->iovNext];
405
406 while (!call->error && call->iovNBytes && call->iovNext < call->iovMax) {
407 if (call->app.nLeft == 0) {
408 /* Get next packet */
409 code = rxi_GetNextPacket(call);
410 if (code) {
411 MUTEX_ENTER(&call->lock);
412 return 1;
413 }
414
415 if (call->app.currentPacket) {
416 cur_iov = &call->app.currentPacket->wirevec[1];
417 didConsume = 1;
418 continue;
419 } else {
420 break;
421 }
422 }
423
424 /* It's possible for call->app.nLeft to be smaller than any particular
425 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
426 * reflects the size of the buffer. We have to keep track of the
427 * number of bytes read in the length field of the packet struct. On
428 * the final portion of a received packet, it's almost certain that
429 * call->app.nLeft will be smaller than the final buffer. */
430 while (call->iovNBytes
431 && call->iovNext < call->iovMax
432 && call->app.currentPacket) {
433
434 t = MIN((int)call->app.curlen, call->iovNBytes);
435 t = MIN(t, (int)call->app.nLeft);
436 call_iov->iov_base = call->app.curpos;
437 call_iov->iov_len = t;
438 call_iov++;
439 call->iovNext++;
440 call->iovNBytes -= t;
441 call->app.curpos += t;
442 call->app.curlen -= t;
443 call->app.nLeft -= t;
444
445 if (!call->app.nLeft) {
446 /* out of packet. Get another one. */
447 #ifdef RX_TRACK_PACKETS
448 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
449 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
450 #endif
451 opr_queue_Append(&call->app.iovq,
452 &call->app.currentPacket->entry);
453 #ifdef RXDEBUG_PACKET
454 call->iovqc++;
455 #endif /* RXDEBUG_PACKET */
456 call->app.currentPacket = NULL;
457 } else if (!call->app.curlen) {
458 /* need to get another struct iov */
459 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
460 /* current packet is exhausted, get ready for another */
461 /* don't worry about curvec and stuff, they get set somewhere else */
462 #ifdef RX_TRACK_PACKETS
463 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
464 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
465 #endif
466 opr_queue_Append(&call->app.iovq,
467 &call->app.currentPacket->entry);
468 #ifdef RXDEBUG_PACKET
469 call->iovqc++;
470 #endif /* RXDEBUG_PACKET */
471 call->app.currentPacket = NULL;
472 call->app.nLeft = 0;
473 } else {
474 cur_iov++;
475 call->app.curpos = (char *)cur_iov->iov_base;
476 call->app.curlen = cur_iov->iov_len;
477 }
478 }
479 }
480 }
481
482 /* If we consumed any packets then check whether we need to
483 * send a hard ack. */
484 if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
485 if (call->nHardAcks > (u_short) rxi_HardAckRate) {
486 rxi_CancelDelayedAckEvent(call);
487 rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
488 didHardAck = 1;
489 } else {
490 /* Delay to consolidate ack packets */
491 rxi_PostDelayedAckEvent(call, &rx_hardAckDelay);
492 }
493 }
494 return didHardAck;
495 }
496
497
498 /* rxi_ReadvProc -- internal version.
499 *
500 * Fills in an iovec with pointers to the packet buffers. All packets
501 * except the last packet (new current packet) are moved to the iovq
502 * while the application is processing the data.
503 *
504 * LOCKS USED -- called at netpri.
505 */
506 int
507 rxi_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
508 int nbytes)
509 {
510 int bytes;
511
512 /* Free any packets from the last call to ReadvProc/WritevProc */
513 if (!opr_queue_IsEmpty(&call->app.iovq)) {
514 #ifdef RXDEBUG_PACKET
515 call->iovqc -=
516 #endif /* RXDEBUG_PACKET */
517 rxi_FreePackets(0, &call->app.iovq);
518 }
519
520 if (call->app.mode == RX_MODE_SENDING) {
521 rxi_FlushWrite(call);
522 }
523
524 MUTEX_ENTER(&call->lock);
525 if (call->error)
526 goto error;
527
528 /* Get whatever data is currently available in the receive queue.
529 * If rxi_FillReadVec sends an ack packet then it is possible
530 * that we will receive more data while we drop the call lock
531 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
532 * here to avoid a race with the receive thread if we send
533 * hard acks in rxi_FillReadVec. */
534 call->flags |= RX_CALL_IOVEC_WAIT;
535 call->iovNBytes = nbytes;
536 call->iovMax = maxio;
537 call->iovNext = 0;
538 call->iov = iov;
539 rxi_FillReadVec(call, 0);
540
541 /* if we need more data then sleep until the receive thread has
542 * filled in the rest. */
543 if (!call->error && call->iovNBytes && call->iovNext < call->iovMax
544 && !(call->flags & RX_CALL_RECEIVE_DONE)) {
545 call->flags |= RX_CALL_READER_WAIT;
546 clock_NewTime();
547 call->startWait = clock_Sec();
548 while (call->flags & RX_CALL_READER_WAIT) {
549 #ifdef RX_ENABLE_LOCKS
550 CV_WAIT(&call->cv_rq, &call->lock);
551 #else
552 osi_rxSleep(&call->rq);
553 #endif
554 }
555 call->startWait = 0;
556 }
557 call->flags &= ~RX_CALL_IOVEC_WAIT;
558
559 if (call->error)
560 goto error;
561
562 call->iov = NULL;
563 *nio = call->iovNext;
564 bytes = nbytes - call->iovNBytes;
565 MUTEX_EXIT(&call->lock);
566 return bytes;
567
568 error:
569 MUTEX_EXIT(&call->lock);
570 call->app.mode = RX_MODE_ERROR;
571 return 0;
572 }
573
574 int
575 rx_ReadvProc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
576 int nbytes)
577 {
578 int bytes;
579 SPLVAR;
580
581 NETPRI;
582 bytes = rxi_ReadvProc(call, iov, nio, maxio, nbytes);
583 USERPRI;
584 return bytes;
585 }
586
587 /* rxi_WriteProc -- internal version.
588 *
589 * LOCKS USED -- called at netpri
590 */
591
592 int
593 rxi_WriteProc(struct rx_call *call, char *buf,
594 int nbytes)
595 {
596 struct rx_connection *conn = call->conn;
597 unsigned int t;
598 int requestCount = nbytes;
599
600 /* Free any packets from the last call to ReadvProc/WritevProc */
601 if (!opr_queue_IsEmpty(&call->app.iovq)) {
602 #ifdef RXDEBUG_PACKET
603 call->iovqc -=
604 #endif /* RXDEBUG_PACKET */
605 rxi_FreePackets(0, &call->app.iovq);
606 }
607
608 if (call->app.mode != RX_MODE_SENDING) {
609 if ((conn->type == RX_SERVER_CONNECTION)
610 && (call->app.mode == RX_MODE_RECEIVING)) {
611 call->app.mode = RX_MODE_SENDING;
612 if (call->app.currentPacket) {
613 #ifdef RX_TRACK_PACKETS
614 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
615 #endif
616 rxi_FreePacket(call->app.currentPacket);
617 call->app.currentPacket = NULL;
618 call->app.nLeft = 0;
619 call->app.nFree = 0;
620 }
621 } else {
622 return 0;
623 }
624 }
625
626 /* Loop condition is checked at end, so that a write of 0 bytes
627 * will force a packet to be created--specially for the case where
628 * there are 0 bytes on the stream, but we must send a packet
629 * anyway. */
630 do {
631 if (call->app.nFree == 0) {
632 MUTEX_ENTER(&call->lock);
633 if (call->error)
634 call->app.mode = RX_MODE_ERROR;
635 if (!call->error && call->app.currentPacket) {
636 clock_NewTime(); /* Bogus: need new time package */
637 /* The 0, below, specifies that it is not the last packet:
638 * there will be others. PrepareSendPacket may
639 * alter the packet length by up to
640 * conn->securityMaxTrailerSize */
641 call->app.bytesSent += call->app.currentPacket->length;
642 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
643 /* PrepareSendPacket drops the call lock */
644 rxi_WaitforTQBusy(call);
645 #ifdef RX_TRACK_PACKETS
646 call->app.currentPacket->flags |= RX_PKTFLAG_TQ;
647 #endif
648 opr_queue_Append(&call->tq,
649 &call->app.currentPacket->entry);
650 #ifdef RXDEBUG_PACKET
651 call->tqc++;
652 #endif /* RXDEBUG_PACKET */
653 #ifdef RX_TRACK_PACKETS
654 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
655 #endif
656 call->app.currentPacket = NULL;
657
658 /* If the call is in recovery, let it exhaust its current
659 * retransmit queue before forcing it to send new packets
660 */
661 if (!(call->flags & (RX_CALL_FAST_RECOVER))) {
662 rxi_Start(call, 0);
663 }
664 } else if (call->app.currentPacket) {
665 #ifdef RX_TRACK_PACKETS
666 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
667 #endif
668 rxi_FreePacket(call->app.currentPacket);
669 call->app.currentPacket = NULL;
670 }
671 /* Wait for transmit window to open up */
672 while (!call->error
673 && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
674 clock_NewTime();
675 call->startWait = clock_Sec();
676
677 #ifdef RX_ENABLE_LOCKS
678 CV_WAIT(&call->cv_twind, &call->lock);
679 #else
680 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
681 osi_rxSleep(&call->twind);
682 #endif
683
684 call->startWait = 0;
685 #ifdef RX_ENABLE_LOCKS
686 if (call->error) {
687 call->app.mode = RX_MODE_ERROR;
688 MUTEX_EXIT(&call->lock);
689 return 0;
690 }
691 #endif /* RX_ENABLE_LOCKS */
692 }
693 if ((call->app.currentPacket = rxi_AllocSendPacket(call, nbytes))) {
694 #ifdef RX_TRACK_PACKETS
695 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
696 #endif
697 call->app.nFree = call->app.currentPacket->length;
698 call->app.curvec = 1; /* 0th vec is always header */
699 /* begin at the beginning [ more or less ], continue
700 * on until the end, then stop. */
701 call->app.curpos =
702 (char *) call->app.currentPacket->wirevec[1].iov_base +
703 call->conn->securityHeaderSize;
704 call->app.curlen =
705 call->app.currentPacket->wirevec[1].iov_len -
706 call->conn->securityHeaderSize;
707 }
708 if (call->error) {
709 call->app.mode = RX_MODE_ERROR;
710 if (call->app.currentPacket) {
711 #ifdef RX_TRACK_PACKETS
712 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
713 #endif
714 rxi_FreePacket(call->app.currentPacket);
715 call->app.currentPacket = NULL;
716 }
717 MUTEX_EXIT(&call->lock);
718 return 0;
719 }
720 MUTEX_EXIT(&call->lock);
721 }
722
723 if (call->app.currentPacket && (int)call->app.nFree < nbytes) {
724 /* Try to extend the current buffer */
725 int len, mud;
726 len = call->app.currentPacket->length;
727 mud = rx_MaxUserDataSize(call);
728 if (mud > len) {
729 int want;
730 want = MIN(nbytes - (int)call->app.nFree, mud - len);
731 rxi_AllocDataBuf(call->app.currentPacket, want,
732 RX_PACKET_CLASS_SEND_CBUF);
733 if (call->app.currentPacket->length > (unsigned)mud)
734 call->app.currentPacket->length = mud;
735 call->app.nFree += (call->app.currentPacket->length - len);
736 }
737 }
738
739 /* If the remaining bytes fit in the buffer, then store them
740 * and return. Don't ship a buffer that's full immediately to
741 * the peer--we don't know if it's the last buffer yet */
742
743 if (!call->app.currentPacket) {
744 call->app.nFree = 0;
745 }
746
747 while (nbytes && call->app.nFree) {
748
749 t = MIN((int)call->app.curlen, nbytes);
750 t = MIN((int)call->app.nFree, t);
751 memcpy(call->app.curpos, buf, t);
752 buf += t;
753 nbytes -= t;
754 call->app.curpos += t;
755 call->app.curlen -= (u_short)t;
756 call->app.nFree -= (u_short)t;
757
758 if (!call->app.curlen) {
759 /* need to get another struct iov */
760 if (++call->app.curvec >= call->app.currentPacket->niovecs) {
761 /* current packet is full, extend or send it */
762 call->app.nFree = 0;
763 } else {
764 call->app.curpos =
765 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
766 call->app.curlen =
767 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
768 }
769 }
770 } /* while bytes to send and room to send them */
771
772 /* might be out of space now */
773 if (!nbytes) {
774 return requestCount;
775 } else {
776 /* more data to send, so get another packet and keep going */
777 }
778 } while (nbytes);
779
780 return requestCount - nbytes;
781 }
782
783 int
784 rx_WriteProc(struct rx_call *call, char *buf, int nbytes)
785 {
786 int bytes;
787 int tcurlen;
788 int tnFree;
789 char *tcurpos;
790 SPLVAR;
791
792 /* Free any packets from the last call to ReadvProc/WritevProc */
793 if (!opr_queue_IsEmpty(&call->app.iovq)) {
794 #ifdef RXDEBUG_PACKET
795 call->iovqc -=
796 #endif /* RXDEBUG_PACKET */
797 rxi_FreePackets(0, &call->app.iovq);
798 }
799
800 /*
801 * Most common case: all of the data fits in the current iovec.
802 * We are relying on nFree being zero unless the call is in send mode.
803 */
804 tcurlen = (int)call->app.curlen;
805 tnFree = (int)call->app.nFree;
806 if (!call->error && tcurlen >= nbytes && tnFree >= nbytes) {
807 tcurpos = call->app.curpos;
808
809 memcpy(tcurpos, buf, nbytes);
810 call->app.curpos = tcurpos + nbytes;
811 call->app.curlen = (u_short)(tcurlen - nbytes);
812 call->app.nFree = (u_short)(tnFree - nbytes);
813 return nbytes;
814 }
815
816 NETPRI;
817 bytes = rxi_WriteProc(call, buf, nbytes);
818 USERPRI;
819 return bytes;
820 }
821
822 /* Optimization for marshalling 32 bit arguments */
823 int
824 rx_WriteProc32(struct rx_call *call, afs_int32 * value)
825 {
826 int bytes;
827 int tcurlen;
828 int tnFree;
829 char *tcurpos;
830 SPLVAR;
831
832 if (!opr_queue_IsEmpty(&call->app.iovq)) {
833 #ifdef RXDEBUG_PACKET
834 call->iovqc -=
835 #endif /* RXDEBUG_PACKET */
836 rxi_FreePackets(0, &call->app.iovq);
837 }
838
839 /*
840 * Most common case: all of the data fits in the current iovec.
841 * We are relying on nFree being zero unless the call is in send mode.
842 */
843 tcurlen = call->app.curlen;
844 tnFree = call->app.nFree;
845 if (!call->error && tcurlen >= sizeof(afs_int32)
846 && tnFree >= sizeof(afs_int32)) {
847 tcurpos = call->app.curpos;
848
849 if (!((size_t)tcurpos & (sizeof(afs_int32) - 1))) {
850 *((afs_int32 *) (tcurpos)) = *value;
851 } else {
852 memcpy(tcurpos, (char *)value, sizeof(afs_int32));
853 }
854 call->app.curpos = tcurpos + sizeof(afs_int32);
855 call->app.curlen = (u_short)(tcurlen - sizeof(afs_int32));
856 call->app.nFree = (u_short)(tnFree - sizeof(afs_int32));
857 return sizeof(afs_int32);
858 }
859
860 NETPRI;
861 bytes = rxi_WriteProc(call, (char *)value, sizeof(afs_int32));
862 USERPRI;
863 return bytes;
864 }
865
866 /* rxi_WritevAlloc -- internal version.
867 *
868 * Fill in an iovec to point to data in packet buffers. The application
869 * calls rxi_WritevProc when the buffers are full.
870 *
871 * LOCKS USED -- called at netpri.
872 */
873
874 static int
875 rxi_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
876 int nbytes)
877 {
878 struct rx_connection *conn = call->conn;
879 struct rx_packet *cp;
880 int requestCount;
881 int nextio;
882 /* Temporary values, real work is done in rxi_WritevProc */
883 int tnFree;
884 unsigned int tcurvec;
885 char *tcurpos;
886 int tcurlen;
887
888 requestCount = nbytes;
889 nextio = 0;
890
891 /* Free any packets from the last call to ReadvProc/WritevProc */
892 if (!opr_queue_IsEmpty(&call->app.iovq)) {
893 #ifdef RXDEBUG_PACKET
894 call->iovqc -=
895 #endif /* RXDEBUG_PACKET */
896 rxi_FreePackets(0, &call->app.iovq);
897 }
898
899 if (call->app.mode != RX_MODE_SENDING) {
900 if ((conn->type == RX_SERVER_CONNECTION)
901 && (call->app.mode == RX_MODE_RECEIVING)) {
902 call->app.mode = RX_MODE_SENDING;
903 if (call->app.currentPacket) {
904 #ifdef RX_TRACK_PACKETS
905 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
906 #endif
907 rxi_FreePacket(call->app.currentPacket);
908 call->app.currentPacket = NULL;
909 call->app.nLeft = 0;
910 call->app.nFree = 0;
911 }
912 } else {
913 return 0;
914 }
915 }
916
917 /* Set up the iovec to point to data in packet buffers. */
918 tnFree = call->app.nFree;
919 tcurvec = call->app.curvec;
920 tcurpos = call->app.curpos;
921 tcurlen = call->app.curlen;
922 cp = call->app.currentPacket;
923 do {
924 int t;
925
926 if (tnFree == 0) {
927 /* current packet is full, allocate a new one */
928 MUTEX_ENTER(&call->lock);
929 cp = rxi_AllocSendPacket(call, nbytes);
930 MUTEX_EXIT(&call->lock);
931 if (cp == NULL) {
932 /* out of space, return what we have */
933 *nio = nextio;
934 return requestCount - nbytes;
935 }
936 #ifdef RX_TRACK_PACKETS
937 cp->flags |= RX_PKTFLAG_IOVQ;
938 #endif
939 opr_queue_Append(&call->app.iovq, &cp->entry);
940 #ifdef RXDEBUG_PACKET
941 call->iovqc++;
942 #endif /* RXDEBUG_PACKET */
943 tnFree = cp->length;
944 tcurvec = 1;
945 tcurpos =
946 (char *)cp->wirevec[1].iov_base +
947 call->conn->securityHeaderSize;
948 tcurlen = cp->wirevec[1].iov_len - call->conn->securityHeaderSize;
949 }
950
951 if (tnFree < nbytes) {
952 /* try to extend the current packet */
953 int len, mud;
954 len = cp->length;
955 mud = rx_MaxUserDataSize(call);
956 if (mud > len) {
957 int want;
958 want = MIN(nbytes - tnFree, mud - len);
959 rxi_AllocDataBuf(cp, want, RX_PACKET_CLASS_SEND_CBUF);
960 if (cp->length > (unsigned)mud)
961 cp->length = mud;
962 tnFree += (cp->length - len);
963 if (cp == call->app.currentPacket) {
964 call->app.nFree += (cp->length - len);
965 }
966 }
967 }
968
969 /* fill in the next entry in the iovec */
970 t = MIN(tcurlen, nbytes);
971 t = MIN(tnFree, t);
972 iov[nextio].iov_base = tcurpos;
973 iov[nextio].iov_len = t;
974 nbytes -= t;
975 tcurpos += t;
976 tcurlen -= t;
977 tnFree -= t;
978 nextio++;
979
980 if (!tcurlen) {
981 /* need to get another struct iov */
982 if (++tcurvec >= cp->niovecs) {
983 /* current packet is full, extend it or move on to next packet */
984 tnFree = 0;
985 } else {
986 tcurpos = (char *)cp->wirevec[tcurvec].iov_base;
987 tcurlen = cp->wirevec[tcurvec].iov_len;
988 }
989 }
990 } while (nbytes && nextio < maxio);
991 *nio = nextio;
992 return requestCount - nbytes;
993 }
994
995 int
996 rx_WritevAlloc(struct rx_call *call, struct iovec *iov, int *nio, int maxio,
997 int nbytes)
998 {
999 int bytes;
1000 SPLVAR;
1001
1002 NETPRI;
1003 bytes = rxi_WritevAlloc(call, iov, nio, maxio, nbytes);
1004 USERPRI;
1005 return bytes;
1006 }
1007
1008 /* rxi_WritevProc -- internal version.
1009 *
1010 * Send buffers allocated in rxi_WritevAlloc.
1011 *
1012 * LOCKS USED -- called at netpri.
1013 */
1014 int
1015 rxi_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1016 {
1017 #ifdef RX_TRACK_PACKETS
1018 struct opr_queue *cursor;
1019 #endif
1020 int nextio = 0;
1021 int requestCount;
1022 struct opr_queue tmpq;
1023 #ifdef RXDEBUG_PACKET
1024 u_short tmpqc;
1025 #endif
1026
1027 requestCount = nbytes;
1028
1029 MUTEX_ENTER(&call->lock);
1030 if (call->error) {
1031 call->app.mode = RX_MODE_ERROR;
1032 } else if (call->app.mode != RX_MODE_SENDING) {
1033 call->error = RX_PROTOCOL_ERROR;
1034 }
1035 rxi_WaitforTQBusy(call);
1036
1037 if (call->error) {
1038 call->app.mode = RX_MODE_ERROR;
1039 MUTEX_EXIT(&call->lock);
1040 if (call->app.currentPacket) {
1041 #ifdef RX_TRACK_PACKETS
1042 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1043 call->app.currentPacket->flags |= RX_PKTFLAG_IOVQ;
1044 #endif
1045 opr_queue_Prepend(&call->app.iovq,
1046 &call->app.currentPacket->entry);
1047 #ifdef RXDEBUG_PACKET
1048 call->iovqc++;
1049 #endif /* RXDEBUG_PACKET */
1050 call->app.currentPacket = NULL;
1051 }
1052 #ifdef RXDEBUG_PACKET
1053 call->iovqc -=
1054 #endif /* RXDEBUG_PACKET */
1055 rxi_FreePackets(0, &call->app.iovq);
1056 return 0;
1057 }
1058
1059 /* Loop through the I/O vector adjusting packet pointers.
1060 * Place full packets back onto the iovq once they are ready
1061 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1062 * the iovec. We put the loop condition at the end to ensure that
1063 * a zero length write will push a short packet. */
1064 opr_queue_Init(&tmpq);
1065 #ifdef RXDEBUG_PACKET
1066 tmpqc = 0;
1067 #endif /* RXDEBUG_PACKET */
1068 do {
1069 if (call->app.nFree == 0 && call->app.currentPacket) {
1070 clock_NewTime(); /* Bogus: need new time package */
1071 /* The 0, below, specifies that it is not the last packet:
1072 * there will be others. PrepareSendPacket may
1073 * alter the packet length by up to
1074 * conn->securityMaxTrailerSize */
1075 call->app.bytesSent += call->app.currentPacket->length;
1076 rxi_PrepareSendPacket(call, call->app.currentPacket, 0);
1077 /* PrepareSendPacket drops the call lock */
1078 rxi_WaitforTQBusy(call);
1079 opr_queue_Append(&tmpq, &call->app.currentPacket->entry);
1080 #ifdef RXDEBUG_PACKET
1081 tmpqc++;
1082 #endif /* RXDEBUG_PACKET */
1083 call->app.currentPacket = NULL;
1084
1085 /* The head of the iovq is now the current packet */
1086 if (nbytes) {
1087 if (opr_queue_IsEmpty(&call->app.iovq)) {
1088 MUTEX_EXIT(&call->lock);
1089 call->error = RX_PROTOCOL_ERROR;
1090 #ifdef RXDEBUG_PACKET
1091 tmpqc -=
1092 #endif /* RXDEBUG_PACKET */
1093 rxi_FreePackets(0, &tmpq);
1094 return 0;
1095 }
1096 call->app.currentPacket
1097 = opr_queue_First(&call->app.iovq, struct rx_packet,
1098 entry);
1099 opr_queue_Remove(&call->app.currentPacket->entry);
1100 #ifdef RX_TRACK_PACKETS
1101 call->app.currentPacket->flags &= ~RX_PKTFLAG_IOVQ;
1102 call->app.currentPacket->flags |= RX_PKTFLAG_CP;
1103 #endif
1104 #ifdef RXDEBUG_PACKET
1105 call->iovqc--;
1106 #endif /* RXDEBUG_PACKET */
1107 call->app.nFree = call->app.currentPacket->length;
1108 call->app.curvec = 1;
1109 call->app.curpos =
1110 (char *) call->app.currentPacket->wirevec[1].iov_base +
1111 call->conn->securityHeaderSize;
1112 call->app.curlen =
1113 call->app.currentPacket->wirevec[1].iov_len -
1114 call->conn->securityHeaderSize;
1115 }
1116 }
1117
1118 if (nbytes) {
1119 /* The next iovec should point to the current position */
1120 if (iov[nextio].iov_base != call->app.curpos
1121 || iov[nextio].iov_len > (int)call->app.curlen) {
1122 call->error = RX_PROTOCOL_ERROR;
1123 MUTEX_EXIT(&call->lock);
1124 if (call->app.currentPacket) {
1125 #ifdef RX_TRACK_PACKETS
1126 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1127 #endif
1128 opr_queue_Prepend(&tmpq,
1129 &call->app.currentPacket->entry);
1130 #ifdef RXDEBUG_PACKET
1131 tmpqc++;
1132 #endif /* RXDEBUG_PACKET */
1133 call->app.currentPacket = NULL;
1134 }
1135 #ifdef RXDEBUG_PACKET
1136 tmpqc -=
1137 #endif /* RXDEBUG_PACKET */
1138 rxi_FreePackets(0, &tmpq);
1139 return 0;
1140 }
1141 nbytes -= iov[nextio].iov_len;
1142 call->app.curpos += iov[nextio].iov_len;
1143 call->app.curlen -= iov[nextio].iov_len;
1144 call->app.nFree -= iov[nextio].iov_len;
1145 nextio++;
1146 if (call->app.curlen == 0) {
1147 if (++call->app.curvec > call->app.currentPacket->niovecs) {
1148 call->app.nFree = 0;
1149 } else {
1150 call->app.curpos =
1151 call->app.currentPacket->wirevec[call->app.curvec].iov_base;
1152 call->app.curlen =
1153 call->app.currentPacket->wirevec[call->app.curvec].iov_len;
1154 }
1155 }
1156 }
1157 } while (nbytes && nextio < nio);
1158
1159 /* Move the packets from the temporary queue onto the transmit queue.
1160 * We may end up with more than call->twind packets on the queue. */
1161
1162 #ifdef RX_TRACK_PACKETS
1163 for (opr_queue_Scan(&tmpq, cursor))
1164 {
1165 struct rx_packet *p = opr_queue_Entry(cursor, struct rx_packet, entry);
1166 p->flags |= RX_PKTFLAG_TQ;
1167 }
1168 #endif
1169 if (call->error)
1170 call->app.mode = RX_MODE_ERROR;
1171
1172 opr_queue_SpliceAppend(&call->tq, &tmpq);
1173
1174 /* If the call is in recovery, let it exhaust its current retransmit
1175 * queue before forcing it to send new packets
1176 */
1177 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1178 rxi_Start(call, 0);
1179 }
1180
1181 /* Wait for the length of the transmit queue to fall below call->twind */
1182 while (!call->error && call->tnext + 1 > call->tfirst + (2 * call->twind)) {
1183 clock_NewTime();
1184 call->startWait = clock_Sec();
1185 #ifdef RX_ENABLE_LOCKS
1186 CV_WAIT(&call->cv_twind, &call->lock);
1187 #else
1188 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
1189 osi_rxSleep(&call->twind);
1190 #endif
1191 call->startWait = 0;
1192 }
1193
1194 if (call->error) {
1195 call->app.mode = RX_MODE_ERROR;
1196 call->app.currentPacket = NULL;
1197 MUTEX_EXIT(&call->lock);
1198 if (call->app.currentPacket) {
1199 #ifdef RX_TRACK_PACKETS
1200 call->app.currentPacket->flags &= ~RX_PKTFLAG_CP;
1201 #endif
1202 rxi_FreePacket(call->app.currentPacket);
1203 }
1204 return 0;
1205 }
1206 MUTEX_EXIT(&call->lock);
1207
1208 return requestCount - nbytes;
1209 }
1210
1211 int
1212 rx_WritevProc(struct rx_call *call, struct iovec *iov, int nio, int nbytes)
1213 {
1214 int bytes;
1215 SPLVAR;
1216
1217 NETPRI;
1218 bytes = rxi_WritevProc(call, iov, nio, nbytes);
1219 USERPRI;
1220 return bytes;
1221 }
1222
1223 /* Flush any buffered data to the stream, switch to read mode
1224 * (clients) or to EOF mode (servers). If 'locked' is nonzero, call->lock must
1225 * be already held.
1226 *
1227 * LOCKS HELD: called at netpri.
1228 */
1229 static void
1230 FlushWrite(struct rx_call *call, int locked)
1231 {
1232 struct rx_packet *cp = NULL;
1233
1234 /* Free any packets from the last call to ReadvProc/WritevProc */
1235 if (!opr_queue_IsEmpty(&call->app.iovq)) {
1236 #ifdef RXDEBUG_PACKET
1237 call->iovqc -=
1238 #endif /* RXDEBUG_PACKET */
1239 rxi_FreePackets(0, &call->app.iovq);
1240 }
1241
1242 if (call->app.mode == RX_MODE_SENDING) {
1243
1244 call->app.mode =
1245 (call->conn->type ==
1246 RX_CLIENT_CONNECTION ? RX_MODE_RECEIVING : RX_MODE_EOF);
1247
1248 #ifdef RX_KERNEL_TRACE
1249 {
1250 int glockOwner = ISAFS_GLOCK();
1251 if (!glockOwner)
1252 AFS_GLOCK();
1253 afs_Trace3(afs_iclSetp, CM_TRACE_WASHERE, ICL_TYPE_STRING,
1254 __FILE__, ICL_TYPE_INT32, __LINE__, ICL_TYPE_POINTER,
1255 call);
1256 if (!glockOwner)
1257 AFS_GUNLOCK();
1258 }
1259 #endif
1260
1261 if (!locked) {
1262 MUTEX_ENTER(&call->lock);
1263 }
1264
1265 if (call->error)
1266 call->app.mode = RX_MODE_ERROR;
1267
1268 call->flags |= RX_CALL_FLUSH;
1269
1270 cp = call->app.currentPacket;
1271
1272 if (cp) {
1273 /* cp->length is only supposed to be the user's data */
1274 /* cp->length was already set to (then-current)
1275 * MaxUserDataSize or less. */
1276 #ifdef RX_TRACK_PACKETS
1277 cp->flags &= ~RX_PKTFLAG_CP;
1278 #endif
1279 cp->length -= call->app.nFree;
1280 call->app.currentPacket = NULL;
1281 call->app.nFree = 0;
1282 } else {
1283 cp = rxi_AllocSendPacket(call, 0);
1284 if (!cp) {
1285 /* Mode can no longer be MODE_SENDING */
1286 return;
1287 }
1288 cp->length = 0;
1289 cp->niovecs = 2; /* header + space for rxkad stuff */
1290 call->app.nFree = 0;
1291 }
1292
1293 /* The 1 specifies that this is the last packet */
1294 call->app.bytesSent += cp->length;
1295 rxi_PrepareSendPacket(call, cp, 1);
1296 /* PrepareSendPacket drops the call lock */
1297 rxi_WaitforTQBusy(call);
1298 #ifdef RX_TRACK_PACKETS
1299 cp->flags |= RX_PKTFLAG_TQ;
1300 #endif
1301 opr_queue_Append(&call->tq, &cp->entry);
1302 #ifdef RXDEBUG_PACKET
1303 call->tqc++;
1304 #endif /* RXDEBUG_PACKET */
1305
1306 /* If the call is in recovery, let it exhaust its current retransmit
1307 * queue before forcing it to send new packets
1308 */
1309 if (!(call->flags & RX_CALL_FAST_RECOVER)) {
1310 rxi_Start(call, 0);
1311 }
1312 if (!locked) {
1313 MUTEX_EXIT(&call->lock);
1314 }
1315 }
1316 }
1317
1318 void
1319 rxi_FlushWrite(struct rx_call *call)
1320 {
1321 FlushWrite(call, 0);
1322 }
1323
1324 void
1325 rxi_FlushWriteLocked(struct rx_call *call)
1326 {
1327 FlushWrite(call, 1);
1328 }
1329
1330 /* Flush any buffered data to the stream, switch to read mode
1331 * (clients) or to EOF mode (servers) */
1332 void
1333 rx_FlushWrite(struct rx_call *call)
1334 {
1335 SPLVAR;
1336 NETPRI;
1337 FlushWrite(call, 0);
1338 USERPRI;
1339 }