2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
10 #include <afsconfig.h>
11 #include <afs/param.h>
15 # ifdef RX_KERNEL_TRACE
16 # include "rx_kcommon.h"
18 # if defined(AFS_DARWIN_ENV) || defined(AFS_XBSD_ENV)
19 # include "afs/sysincludes.h"
24 # if defined(AFS_AIX_ENV) || defined(AFS_AUX_ENV) || defined(AFS_SUN5_ENV)
28 # include <net/net_globals.h>
29 # endif /* AFS_OSF_ENV */
30 # ifdef AFS_LINUX20_ENV
31 # include "h/socket.h"
33 # include "netinet/in.h"
34 # if defined(AFS_SGI_ENV)
35 # include "afs/sysincludes.h"
38 # include "afs/afs_args.h"
39 # if (defined(AFS_AUX_ENV) || defined(AFS_AIX_ENV))
43 # include "afs/sysincludes.h"
44 # endif /* !UKERNEL */
47 # undef RXDEBUG /* turn off debugging */
50 # include "afs/afs_osi.h"
51 # include "rx_kmutex.h"
52 # include "rx/rx_kernel.h"
53 # include "afs/lock.h"
61 #include "rx_globals.h"
62 #include "rx_atomic.h"
63 #include "rx_internal.h"
66 #include "rx_packet.h"
69 /* rxdb_fileID is used to identify the lock location, along with line#. */
70 static int rxdb_fileID
= RXDB_FILE_RX_RDWR
;
71 #endif /* RX_LOCKS_DB */
73 /* Get the next packet in the receive queue
75 * Dispose of the call's currentPacket, and move the next packet in the
76 * receive queue into the currentPacket field. If the next packet isn't
77 * available, then currentPacket is left NULL.
80 * The RX call to manipulate
82 * 0 on success, an error code on failure
85 * Must be called with the call locked. Unlocks the call if returning
90 rxi_GetNextPacket(struct rx_call
*call
) {
94 if (call
->app
.currentPacket
!= NULL
) {
95 #ifdef RX_TRACK_PACKETS
96 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_CP
;
98 rxi_FreePacket(call
->app
.currentPacket
);
99 call
->app
.currentPacket
= NULL
;
102 if (opr_queue_IsEmpty(&call
->rq
))
105 /* Check that next packet available is next in sequence */
106 rp
= opr_queue_First(&call
->rq
, struct rx_packet
, entry
);
107 if (rp
->header
.seq
!= call
->rnext
)
110 opr_queue_Remove(&rp
->entry
);
111 #ifdef RX_TRACK_PACKETS
112 rp
->flags
&= ~RX_PKTFLAG_RQ
;
114 #ifdef RXDEBUG_PACKET
116 #endif /* RXDEBUG_PACKET */
118 /* RXS_CheckPacket called to undo RXS_PreparePacket's work. It may
119 * reduce the length of the packet by up to conn->maxTrailerSize,
120 * to reflect the length of the data + the header. */
121 if ((error
= RXS_CheckPacket(call
->conn
->securityObject
, call
, rp
))) {
122 /* Used to merely shut down the call, but now we shut down the whole
123 * connection since this may indicate an attempt to hijack it */
125 MUTEX_EXIT(&call
->lock
);
126 rxi_ConnectionError(call
->conn
, error
);
127 MUTEX_ENTER(&call
->conn
->conn_data_lock
);
128 rp
= rxi_SendConnectionAbort(call
->conn
, rp
, 0, 0);
129 MUTEX_EXIT(&call
->conn
->conn_data_lock
);
136 call
->app
.currentPacket
= rp
;
137 #ifdef RX_TRACK_PACKETS
138 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_CP
;
140 call
->app
.curvec
= 1; /* 0th vec is always header */
142 /* begin at the beginning [ more or less ], continue on until the end,
144 call
->app
.curpos
= (char *)call
->app
.currentPacket
->wirevec
[1].iov_base
+
145 call
->conn
->securityHeaderSize
;
146 call
->app
.curlen
= call
->app
.currentPacket
->wirevec
[1].iov_len
-
147 call
->conn
->securityHeaderSize
;
149 call
->app
.nLeft
= call
->app
.currentPacket
->length
;
150 call
->app
.bytesRcvd
+= call
->app
.currentPacket
->length
;
157 /* rxi_ReadProc -- internal version.
159 * LOCKS USED -- called at netpri
162 rxi_ReadProc(struct rx_call
*call
, char *buf
,
169 /* XXXX took out clock_NewTime from here. Was it needed? */
170 requestCount
= nbytes
;
172 /* Free any packets from the last call to ReadvProc/WritevProc */
173 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
174 #ifdef RXDEBUG_PACKET
176 #endif /* RXDEBUG_PACKET */
177 rxi_FreePackets(0, &call
->app
.iovq
);
181 if (call
->app
.nLeft
== 0) {
182 /* Get next packet */
183 MUTEX_ENTER(&call
->lock
);
185 if (call
->error
|| (call
->app
.mode
!= RX_MODE_RECEIVING
)) {
187 call
->app
.mode
= RX_MODE_ERROR
;
188 MUTEX_EXIT(&call
->lock
);
191 if (call
->app
.mode
== RX_MODE_SENDING
) {
192 rxi_FlushWriteLocked(call
);
197 code
= rxi_GetNextPacket(call
);
201 if (call
->app
.currentPacket
) {
202 if (!(call
->flags
& RX_CALL_RECEIVE_DONE
)) {
203 if (call
->nHardAcks
> (u_short
) rxi_HardAckRate
) {
204 rxi_CancelDelayedAckEvent(call
);
205 rxi_SendAck(call
, 0, 0, RX_ACK_DELAY
, 0);
207 /* Delay to consolidate ack packets */
208 rxi_PostDelayedAckEvent(call
, &rx_hardAckDelay
);
215 * If we reach this point either we have no packets in the
216 * receive queue or the next packet in the queue is not the
217 * one we are looking for. There is nothing else for us to
218 * do but wait for another packet to arrive.
221 /* Are there ever going to be any more packets? */
222 if (call
->flags
& RX_CALL_RECEIVE_DONE
) {
223 MUTEX_EXIT(&call
->lock
);
224 return requestCount
- nbytes
;
226 /* Wait for in-sequence packet */
227 call
->flags
|= RX_CALL_READER_WAIT
;
229 call
->startWait
= clock_Sec();
230 while (call
->flags
& RX_CALL_READER_WAIT
) {
231 #ifdef RX_ENABLE_LOCKS
232 CV_WAIT(&call
->cv_rq
, &call
->lock
);
234 osi_rxSleep(&call
->rq
);
239 #ifdef RX_ENABLE_LOCKS
241 MUTEX_EXIT(&call
->lock
);
244 #endif /* RX_ENABLE_LOCKS */
246 MUTEX_EXIT(&call
->lock
);
248 /* osi_Assert(cp); */
249 /* MTUXXX this should be replaced by some error-recovery code before shipping */
250 /* yes, the following block is allowed to be the ELSE clause (or not) */
251 /* It's possible for call->app.nLeft to be smaller than any particular
252 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
253 * reflects the size of the buffer. We have to keep track of the
254 * number of bytes read in the length field of the packet struct. On
255 * the final portion of a received packet, it's almost certain that
256 * call->app.nLeft will be smaller than the final buffer. */
257 while (nbytes
&& call
->app
.currentPacket
) {
258 t
= MIN((int)call
->app
.curlen
, nbytes
);
259 t
= MIN(t
, (int)call
->app
.nLeft
);
260 memcpy(buf
, call
->app
.curpos
, t
);
263 call
->app
.curpos
+= t
;
264 call
->app
.curlen
-= t
;
265 call
->app
.nLeft
-= t
;
267 if (!call
->app
.nLeft
) {
268 /* out of packet. Get another one. */
269 #ifdef RX_TRACK_PACKETS
270 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
272 rxi_FreePacket(call
->app
.currentPacket
);
273 call
->app
.currentPacket
= NULL
;
274 } else if (!call
->app
.curlen
) {
275 /* need to get another struct iov */
276 if (++call
->app
.curvec
>= call
->app
.currentPacket
->niovecs
) {
277 /* current packet is exhausted, get ready for another */
278 /* don't worry about curvec and stuff, they get set somewhere else */
279 #ifdef RX_TRACK_PACKETS
280 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
282 rxi_FreePacket(call
->app
.currentPacket
);
283 call
->app
.currentPacket
= NULL
;
287 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_base
;
289 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_len
;
294 /* user buffer is full, return */
304 rx_ReadProc(struct rx_call
*call
, char *buf
, int nbytes
)
309 /* Free any packets from the last call to ReadvProc/WritevProc */
310 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
311 #ifdef RXDEBUG_PACKET
313 #endif /* RXDEBUG_PACKET */
314 rxi_FreePackets(0, &call
->app
.iovq
);
318 * Most common case, all of the data is in the current iovec.
319 * We are relying on nLeft being zero unless the call is in receive mode.
321 if (!call
->error
&& call
->app
.curlen
> nbytes
&& call
->app
.nLeft
> nbytes
) {
322 memcpy(buf
, call
->app
.curpos
, nbytes
);
324 call
->app
.curpos
+= nbytes
;
325 call
->app
.curlen
-= nbytes
;
326 call
->app
.nLeft
-= nbytes
;
328 if (!call
->app
.nLeft
&& call
->app
.currentPacket
!= NULL
) {
329 /* out of packet. Get another one. */
330 rxi_FreePacket(call
->app
.currentPacket
);
331 call
->app
.currentPacket
= NULL
;
337 bytes
= rxi_ReadProc(call
, buf
, nbytes
);
342 /* Optimization for unmarshalling 32 bit integers */
344 rx_ReadProc32(struct rx_call
*call
, afs_int32
* value
)
349 /* Free any packets from the last call to ReadvProc/WritevProc */
350 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
351 #ifdef RXDEBUG_PACKET
353 #endif /* RXDEBUG_PACKET */
354 rxi_FreePackets(0, &call
->app
.iovq
);
358 * Most common case, all of the data is in the current iovec.
359 * We are relying on nLeft being zero unless the call is in receive mode.
361 if (!call
->error
&& call
->app
.curlen
>= sizeof(afs_int32
)
362 && call
->app
.nLeft
>= sizeof(afs_int32
)) {
364 memcpy((char *)value
, call
->app
.curpos
, sizeof(afs_int32
));
366 call
->app
.curpos
+= sizeof(afs_int32
);
367 call
->app
.curlen
-= sizeof(afs_int32
);
368 call
->app
.nLeft
-= sizeof(afs_int32
);
370 if (!call
->app
.nLeft
&& call
->app
.currentPacket
!= NULL
) {
371 /* out of packet. Get another one. */
372 rxi_FreePacket(call
->app
.currentPacket
);
373 call
->app
.currentPacket
= NULL
;
375 return sizeof(afs_int32
);
379 bytes
= rxi_ReadProc(call
, (char *)value
, sizeof(afs_int32
));
387 * Uses packets in the receive queue to fill in as much of the
388 * current iovec as possible. Does not block if it runs out
389 * of packets to complete the iovec. Return true if an ack packet
390 * was sent, otherwise return false */
392 rxi_FillReadVec(struct rx_call
*call
, afs_uint32 serial
)
398 struct iovec
*call_iov
;
399 struct iovec
*cur_iov
= NULL
;
401 if (call
->app
.currentPacket
) {
402 cur_iov
= &call
->app
.currentPacket
->wirevec
[call
->app
.curvec
];
404 call_iov
= &call
->iov
[call
->iovNext
];
406 while (!call
->error
&& call
->iovNBytes
&& call
->iovNext
< call
->iovMax
) {
407 if (call
->app
.nLeft
== 0) {
408 /* Get next packet */
409 code
= rxi_GetNextPacket(call
);
411 MUTEX_ENTER(&call
->lock
);
415 if (call
->app
.currentPacket
) {
416 cur_iov
= &call
->app
.currentPacket
->wirevec
[1];
424 /* It's possible for call->app.nLeft to be smaller than any particular
425 * iov_len. Usually, recvmsg doesn't change the iov_len, since it
426 * reflects the size of the buffer. We have to keep track of the
427 * number of bytes read in the length field of the packet struct. On
428 * the final portion of a received packet, it's almost certain that
429 * call->app.nLeft will be smaller than the final buffer. */
430 while (call
->iovNBytes
431 && call
->iovNext
< call
->iovMax
432 && call
->app
.currentPacket
) {
434 t
= MIN((int)call
->app
.curlen
, call
->iovNBytes
);
435 t
= MIN(t
, (int)call
->app
.nLeft
);
436 call_iov
->iov_base
= call
->app
.curpos
;
437 call_iov
->iov_len
= t
;
440 call
->iovNBytes
-= t
;
441 call
->app
.curpos
+= t
;
442 call
->app
.curlen
-= t
;
443 call
->app
.nLeft
-= t
;
445 if (!call
->app
.nLeft
) {
446 /* out of packet. Get another one. */
447 #ifdef RX_TRACK_PACKETS
448 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
449 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_IOVQ
;
451 opr_queue_Append(&call
->app
.iovq
,
452 &call
->app
.currentPacket
->entry
);
453 #ifdef RXDEBUG_PACKET
455 #endif /* RXDEBUG_PACKET */
456 call
->app
.currentPacket
= NULL
;
457 } else if (!call
->app
.curlen
) {
458 /* need to get another struct iov */
459 if (++call
->app
.curvec
>= call
->app
.currentPacket
->niovecs
) {
460 /* current packet is exhausted, get ready for another */
461 /* don't worry about curvec and stuff, they get set somewhere else */
462 #ifdef RX_TRACK_PACKETS
463 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
464 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_IOVQ
;
466 opr_queue_Append(&call
->app
.iovq
,
467 &call
->app
.currentPacket
->entry
);
468 #ifdef RXDEBUG_PACKET
470 #endif /* RXDEBUG_PACKET */
471 call
->app
.currentPacket
= NULL
;
475 call
->app
.curpos
= (char *)cur_iov
->iov_base
;
476 call
->app
.curlen
= cur_iov
->iov_len
;
482 /* If we consumed any packets then check whether we need to
483 * send a hard ack. */
484 if (didConsume
&& (!(call
->flags
& RX_CALL_RECEIVE_DONE
))) {
485 if (call
->nHardAcks
> (u_short
) rxi_HardAckRate
) {
486 rxi_CancelDelayedAckEvent(call
);
487 rxi_SendAck(call
, 0, serial
, RX_ACK_DELAY
, 0);
490 /* Delay to consolidate ack packets */
491 rxi_PostDelayedAckEvent(call
, &rx_hardAckDelay
);
498 /* rxi_ReadvProc -- internal version.
500 * Fills in an iovec with pointers to the packet buffers. All packets
501 * except the last packet (new current packet) are moved to the iovq
502 * while the application is processing the data.
504 * LOCKS USED -- called at netpri.
507 rxi_ReadvProc(struct rx_call
*call
, struct iovec
*iov
, int *nio
, int maxio
,
512 /* Free any packets from the last call to ReadvProc/WritevProc */
513 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
514 #ifdef RXDEBUG_PACKET
516 #endif /* RXDEBUG_PACKET */
517 rxi_FreePackets(0, &call
->app
.iovq
);
520 if (call
->app
.mode
== RX_MODE_SENDING
) {
521 rxi_FlushWrite(call
);
524 MUTEX_ENTER(&call
->lock
);
528 /* Get whatever data is currently available in the receive queue.
529 * If rxi_FillReadVec sends an ack packet then it is possible
530 * that we will receive more data while we drop the call lock
531 * to send the packet. Set the RX_CALL_IOVEC_WAIT flag
532 * here to avoid a race with the receive thread if we send
533 * hard acks in rxi_FillReadVec. */
534 call
->flags
|= RX_CALL_IOVEC_WAIT
;
535 call
->iovNBytes
= nbytes
;
536 call
->iovMax
= maxio
;
539 rxi_FillReadVec(call
, 0);
541 /* if we need more data then sleep until the receive thread has
542 * filled in the rest. */
543 if (!call
->error
&& call
->iovNBytes
&& call
->iovNext
< call
->iovMax
544 && !(call
->flags
& RX_CALL_RECEIVE_DONE
)) {
545 call
->flags
|= RX_CALL_READER_WAIT
;
547 call
->startWait
= clock_Sec();
548 while (call
->flags
& RX_CALL_READER_WAIT
) {
549 #ifdef RX_ENABLE_LOCKS
550 CV_WAIT(&call
->cv_rq
, &call
->lock
);
552 osi_rxSleep(&call
->rq
);
557 call
->flags
&= ~RX_CALL_IOVEC_WAIT
;
563 *nio
= call
->iovNext
;
564 bytes
= nbytes
- call
->iovNBytes
;
565 MUTEX_EXIT(&call
->lock
);
569 MUTEX_EXIT(&call
->lock
);
570 call
->app
.mode
= RX_MODE_ERROR
;
575 rx_ReadvProc(struct rx_call
*call
, struct iovec
*iov
, int *nio
, int maxio
,
582 bytes
= rxi_ReadvProc(call
, iov
, nio
, maxio
, nbytes
);
587 /* rxi_WriteProc -- internal version.
589 * LOCKS USED -- called at netpri
593 rxi_WriteProc(struct rx_call
*call
, char *buf
,
596 struct rx_connection
*conn
= call
->conn
;
598 int requestCount
= nbytes
;
600 /* Free any packets from the last call to ReadvProc/WritevProc */
601 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
602 #ifdef RXDEBUG_PACKET
604 #endif /* RXDEBUG_PACKET */
605 rxi_FreePackets(0, &call
->app
.iovq
);
608 if (call
->app
.mode
!= RX_MODE_SENDING
) {
609 if ((conn
->type
== RX_SERVER_CONNECTION
)
610 && (call
->app
.mode
== RX_MODE_RECEIVING
)) {
611 call
->app
.mode
= RX_MODE_SENDING
;
612 if (call
->app
.currentPacket
) {
613 #ifdef RX_TRACK_PACKETS
614 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
616 rxi_FreePacket(call
->app
.currentPacket
);
617 call
->app
.currentPacket
= NULL
;
626 /* Loop condition is checked at end, so that a write of 0 bytes
627 * will force a packet to be created--specially for the case where
628 * there are 0 bytes on the stream, but we must send a packet
631 if (call
->app
.nFree
== 0) {
632 MUTEX_ENTER(&call
->lock
);
634 call
->app
.mode
= RX_MODE_ERROR
;
635 if (!call
->error
&& call
->app
.currentPacket
) {
636 clock_NewTime(); /* Bogus: need new time package */
637 /* The 0, below, specifies that it is not the last packet:
638 * there will be others. PrepareSendPacket may
639 * alter the packet length by up to
640 * conn->securityMaxTrailerSize */
641 call
->app
.bytesSent
+= call
->app
.currentPacket
->length
;
642 rxi_PrepareSendPacket(call
, call
->app
.currentPacket
, 0);
643 /* PrepareSendPacket drops the call lock */
644 rxi_WaitforTQBusy(call
);
645 #ifdef RX_TRACK_PACKETS
646 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_TQ
;
648 opr_queue_Append(&call
->tq
,
649 &call
->app
.currentPacket
->entry
);
650 #ifdef RXDEBUG_PACKET
652 #endif /* RXDEBUG_PACKET */
653 #ifdef RX_TRACK_PACKETS
654 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
656 call
->app
.currentPacket
= NULL
;
658 /* If the call is in recovery, let it exhaust its current
659 * retransmit queue before forcing it to send new packets
661 if (!(call
->flags
& (RX_CALL_FAST_RECOVER
))) {
664 } else if (call
->app
.currentPacket
) {
665 #ifdef RX_TRACK_PACKETS
666 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
668 rxi_FreePacket(call
->app
.currentPacket
);
669 call
->app
.currentPacket
= NULL
;
671 /* Wait for transmit window to open up */
673 && call
->tnext
+ 1 > call
->tfirst
+ (2 * call
->twind
)) {
675 call
->startWait
= clock_Sec();
677 #ifdef RX_ENABLE_LOCKS
678 CV_WAIT(&call
->cv_twind
, &call
->lock
);
680 call
->flags
|= RX_CALL_WAIT_WINDOW_ALLOC
;
681 osi_rxSleep(&call
->twind
);
685 #ifdef RX_ENABLE_LOCKS
687 call
->app
.mode
= RX_MODE_ERROR
;
688 MUTEX_EXIT(&call
->lock
);
691 #endif /* RX_ENABLE_LOCKS */
693 if ((call
->app
.currentPacket
= rxi_AllocSendPacket(call
, nbytes
))) {
694 #ifdef RX_TRACK_PACKETS
695 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_CP
;
697 call
->app
.nFree
= call
->app
.currentPacket
->length
;
698 call
->app
.curvec
= 1; /* 0th vec is always header */
699 /* begin at the beginning [ more or less ], continue
700 * on until the end, then stop. */
702 (char *) call
->app
.currentPacket
->wirevec
[1].iov_base
+
703 call
->conn
->securityHeaderSize
;
705 call
->app
.currentPacket
->wirevec
[1].iov_len
-
706 call
->conn
->securityHeaderSize
;
709 call
->app
.mode
= RX_MODE_ERROR
;
710 if (call
->app
.currentPacket
) {
711 #ifdef RX_TRACK_PACKETS
712 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
714 rxi_FreePacket(call
->app
.currentPacket
);
715 call
->app
.currentPacket
= NULL
;
717 MUTEX_EXIT(&call
->lock
);
720 MUTEX_EXIT(&call
->lock
);
723 if (call
->app
.currentPacket
&& (int)call
->app
.nFree
< nbytes
) {
724 /* Try to extend the current buffer */
726 len
= call
->app
.currentPacket
->length
;
727 mud
= rx_MaxUserDataSize(call
);
730 want
= MIN(nbytes
- (int)call
->app
.nFree
, mud
- len
);
731 rxi_AllocDataBuf(call
->app
.currentPacket
, want
,
732 RX_PACKET_CLASS_SEND_CBUF
);
733 if (call
->app
.currentPacket
->length
> (unsigned)mud
)
734 call
->app
.currentPacket
->length
= mud
;
735 call
->app
.nFree
+= (call
->app
.currentPacket
->length
- len
);
739 /* If the remaining bytes fit in the buffer, then store them
740 * and return. Don't ship a buffer that's full immediately to
741 * the peer--we don't know if it's the last buffer yet */
743 if (!call
->app
.currentPacket
) {
747 while (nbytes
&& call
->app
.nFree
) {
749 t
= MIN((int)call
->app
.curlen
, nbytes
);
750 t
= MIN((int)call
->app
.nFree
, t
);
751 memcpy(call
->app
.curpos
, buf
, t
);
754 call
->app
.curpos
+= t
;
755 call
->app
.curlen
-= (u_short
)t
;
756 call
->app
.nFree
-= (u_short
)t
;
758 if (!call
->app
.curlen
) {
759 /* need to get another struct iov */
760 if (++call
->app
.curvec
>= call
->app
.currentPacket
->niovecs
) {
761 /* current packet is full, extend or send it */
765 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_base
;
767 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_len
;
770 } /* while bytes to send and room to send them */
772 /* might be out of space now */
776 /* more data to send, so get another packet and keep going */
780 return requestCount
- nbytes
;
784 rx_WriteProc(struct rx_call
*call
, char *buf
, int nbytes
)
792 /* Free any packets from the last call to ReadvProc/WritevProc */
793 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
794 #ifdef RXDEBUG_PACKET
796 #endif /* RXDEBUG_PACKET */
797 rxi_FreePackets(0, &call
->app
.iovq
);
801 * Most common case: all of the data fits in the current iovec.
802 * We are relying on nFree being zero unless the call is in send mode.
804 tcurlen
= (int)call
->app
.curlen
;
805 tnFree
= (int)call
->app
.nFree
;
806 if (!call
->error
&& tcurlen
>= nbytes
&& tnFree
>= nbytes
) {
807 tcurpos
= call
->app
.curpos
;
809 memcpy(tcurpos
, buf
, nbytes
);
810 call
->app
.curpos
= tcurpos
+ nbytes
;
811 call
->app
.curlen
= (u_short
)(tcurlen
- nbytes
);
812 call
->app
.nFree
= (u_short
)(tnFree
- nbytes
);
817 bytes
= rxi_WriteProc(call
, buf
, nbytes
);
822 /* Optimization for marshalling 32 bit arguments */
824 rx_WriteProc32(struct rx_call
*call
, afs_int32
* value
)
832 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
833 #ifdef RXDEBUG_PACKET
835 #endif /* RXDEBUG_PACKET */
836 rxi_FreePackets(0, &call
->app
.iovq
);
840 * Most common case: all of the data fits in the current iovec.
841 * We are relying on nFree being zero unless the call is in send mode.
843 tcurlen
= call
->app
.curlen
;
844 tnFree
= call
->app
.nFree
;
845 if (!call
->error
&& tcurlen
>= sizeof(afs_int32
)
846 && tnFree
>= sizeof(afs_int32
)) {
847 tcurpos
= call
->app
.curpos
;
849 if (!((size_t)tcurpos
& (sizeof(afs_int32
) - 1))) {
850 *((afs_int32
*) (tcurpos
)) = *value
;
852 memcpy(tcurpos
, (char *)value
, sizeof(afs_int32
));
854 call
->app
.curpos
= tcurpos
+ sizeof(afs_int32
);
855 call
->app
.curlen
= (u_short
)(tcurlen
- sizeof(afs_int32
));
856 call
->app
.nFree
= (u_short
)(tnFree
- sizeof(afs_int32
));
857 return sizeof(afs_int32
);
861 bytes
= rxi_WriteProc(call
, (char *)value
, sizeof(afs_int32
));
866 /* rxi_WritevAlloc -- internal version.
868 * Fill in an iovec to point to data in packet buffers. The application
869 * calls rxi_WritevProc when the buffers are full.
871 * LOCKS USED -- called at netpri.
875 rxi_WritevAlloc(struct rx_call
*call
, struct iovec
*iov
, int *nio
, int maxio
,
878 struct rx_connection
*conn
= call
->conn
;
879 struct rx_packet
*cp
;
882 /* Temporary values, real work is done in rxi_WritevProc */
884 unsigned int tcurvec
;
888 requestCount
= nbytes
;
891 /* Free any packets from the last call to ReadvProc/WritevProc */
892 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
893 #ifdef RXDEBUG_PACKET
895 #endif /* RXDEBUG_PACKET */
896 rxi_FreePackets(0, &call
->app
.iovq
);
899 if (call
->app
.mode
!= RX_MODE_SENDING
) {
900 if ((conn
->type
== RX_SERVER_CONNECTION
)
901 && (call
->app
.mode
== RX_MODE_RECEIVING
)) {
902 call
->app
.mode
= RX_MODE_SENDING
;
903 if (call
->app
.currentPacket
) {
904 #ifdef RX_TRACK_PACKETS
905 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
907 rxi_FreePacket(call
->app
.currentPacket
);
908 call
->app
.currentPacket
= NULL
;
917 /* Set up the iovec to point to data in packet buffers. */
918 tnFree
= call
->app
.nFree
;
919 tcurvec
= call
->app
.curvec
;
920 tcurpos
= call
->app
.curpos
;
921 tcurlen
= call
->app
.curlen
;
922 cp
= call
->app
.currentPacket
;
927 /* current packet is full, allocate a new one */
928 MUTEX_ENTER(&call
->lock
);
929 cp
= rxi_AllocSendPacket(call
, nbytes
);
930 MUTEX_EXIT(&call
->lock
);
932 /* out of space, return what we have */
934 return requestCount
- nbytes
;
936 #ifdef RX_TRACK_PACKETS
937 cp
->flags
|= RX_PKTFLAG_IOVQ
;
939 opr_queue_Append(&call
->app
.iovq
, &cp
->entry
);
940 #ifdef RXDEBUG_PACKET
942 #endif /* RXDEBUG_PACKET */
946 (char *)cp
->wirevec
[1].iov_base
+
947 call
->conn
->securityHeaderSize
;
948 tcurlen
= cp
->wirevec
[1].iov_len
- call
->conn
->securityHeaderSize
;
951 if (tnFree
< nbytes
) {
952 /* try to extend the current packet */
955 mud
= rx_MaxUserDataSize(call
);
958 want
= MIN(nbytes
- tnFree
, mud
- len
);
959 rxi_AllocDataBuf(cp
, want
, RX_PACKET_CLASS_SEND_CBUF
);
960 if (cp
->length
> (unsigned)mud
)
962 tnFree
+= (cp
->length
- len
);
963 if (cp
== call
->app
.currentPacket
) {
964 call
->app
.nFree
+= (cp
->length
- len
);
969 /* fill in the next entry in the iovec */
970 t
= MIN(tcurlen
, nbytes
);
972 iov
[nextio
].iov_base
= tcurpos
;
973 iov
[nextio
].iov_len
= t
;
981 /* need to get another struct iov */
982 if (++tcurvec
>= cp
->niovecs
) {
983 /* current packet is full, extend it or move on to next packet */
986 tcurpos
= (char *)cp
->wirevec
[tcurvec
].iov_base
;
987 tcurlen
= cp
->wirevec
[tcurvec
].iov_len
;
990 } while (nbytes
&& nextio
< maxio
);
992 return requestCount
- nbytes
;
996 rx_WritevAlloc(struct rx_call
*call
, struct iovec
*iov
, int *nio
, int maxio
,
1003 bytes
= rxi_WritevAlloc(call
, iov
, nio
, maxio
, nbytes
);
1008 /* rxi_WritevProc -- internal version.
1010 * Send buffers allocated in rxi_WritevAlloc.
1012 * LOCKS USED -- called at netpri.
1015 rxi_WritevProc(struct rx_call
*call
, struct iovec
*iov
, int nio
, int nbytes
)
1017 #ifdef RX_TRACK_PACKETS
1018 struct opr_queue
*cursor
;
1022 struct opr_queue tmpq
;
1023 #ifdef RXDEBUG_PACKET
1027 requestCount
= nbytes
;
1029 MUTEX_ENTER(&call
->lock
);
1031 call
->app
.mode
= RX_MODE_ERROR
;
1032 } else if (call
->app
.mode
!= RX_MODE_SENDING
) {
1033 call
->error
= RX_PROTOCOL_ERROR
;
1035 rxi_WaitforTQBusy(call
);
1038 call
->app
.mode
= RX_MODE_ERROR
;
1039 MUTEX_EXIT(&call
->lock
);
1040 if (call
->app
.currentPacket
) {
1041 #ifdef RX_TRACK_PACKETS
1042 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
1043 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_IOVQ
;
1045 opr_queue_Prepend(&call
->app
.iovq
,
1046 &call
->app
.currentPacket
->entry
);
1047 #ifdef RXDEBUG_PACKET
1049 #endif /* RXDEBUG_PACKET */
1050 call
->app
.currentPacket
= NULL
;
1052 #ifdef RXDEBUG_PACKET
1054 #endif /* RXDEBUG_PACKET */
1055 rxi_FreePackets(0, &call
->app
.iovq
);
1059 /* Loop through the I/O vector adjusting packet pointers.
1060 * Place full packets back onto the iovq once they are ready
1061 * to send. Set RX_PROTOCOL_ERROR if any problems are found in
1062 * the iovec. We put the loop condition at the end to ensure that
1063 * a zero length write will push a short packet. */
1064 opr_queue_Init(&tmpq
);
1065 #ifdef RXDEBUG_PACKET
1067 #endif /* RXDEBUG_PACKET */
1069 if (call
->app
.nFree
== 0 && call
->app
.currentPacket
) {
1070 clock_NewTime(); /* Bogus: need new time package */
1071 /* The 0, below, specifies that it is not the last packet:
1072 * there will be others. PrepareSendPacket may
1073 * alter the packet length by up to
1074 * conn->securityMaxTrailerSize */
1075 call
->app
.bytesSent
+= call
->app
.currentPacket
->length
;
1076 rxi_PrepareSendPacket(call
, call
->app
.currentPacket
, 0);
1077 /* PrepareSendPacket drops the call lock */
1078 rxi_WaitforTQBusy(call
);
1079 opr_queue_Append(&tmpq
, &call
->app
.currentPacket
->entry
);
1080 #ifdef RXDEBUG_PACKET
1082 #endif /* RXDEBUG_PACKET */
1083 call
->app
.currentPacket
= NULL
;
1085 /* The head of the iovq is now the current packet */
1087 if (opr_queue_IsEmpty(&call
->app
.iovq
)) {
1088 MUTEX_EXIT(&call
->lock
);
1089 call
->error
= RX_PROTOCOL_ERROR
;
1090 #ifdef RXDEBUG_PACKET
1092 #endif /* RXDEBUG_PACKET */
1093 rxi_FreePackets(0, &tmpq
);
1096 call
->app
.currentPacket
1097 = opr_queue_First(&call
->app
.iovq
, struct rx_packet
,
1099 opr_queue_Remove(&call
->app
.currentPacket
->entry
);
1100 #ifdef RX_TRACK_PACKETS
1101 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_IOVQ
;
1102 call
->app
.currentPacket
->flags
|= RX_PKTFLAG_CP
;
1104 #ifdef RXDEBUG_PACKET
1106 #endif /* RXDEBUG_PACKET */
1107 call
->app
.nFree
= call
->app
.currentPacket
->length
;
1108 call
->app
.curvec
= 1;
1110 (char *) call
->app
.currentPacket
->wirevec
[1].iov_base
+
1111 call
->conn
->securityHeaderSize
;
1113 call
->app
.currentPacket
->wirevec
[1].iov_len
-
1114 call
->conn
->securityHeaderSize
;
1119 /* The next iovec should point to the current position */
1120 if (iov
[nextio
].iov_base
!= call
->app
.curpos
1121 || iov
[nextio
].iov_len
> (int)call
->app
.curlen
) {
1122 call
->error
= RX_PROTOCOL_ERROR
;
1123 MUTEX_EXIT(&call
->lock
);
1124 if (call
->app
.currentPacket
) {
1125 #ifdef RX_TRACK_PACKETS
1126 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
1128 opr_queue_Prepend(&tmpq
,
1129 &call
->app
.currentPacket
->entry
);
1130 #ifdef RXDEBUG_PACKET
1132 #endif /* RXDEBUG_PACKET */
1133 call
->app
.currentPacket
= NULL
;
1135 #ifdef RXDEBUG_PACKET
1137 #endif /* RXDEBUG_PACKET */
1138 rxi_FreePackets(0, &tmpq
);
1141 nbytes
-= iov
[nextio
].iov_len
;
1142 call
->app
.curpos
+= iov
[nextio
].iov_len
;
1143 call
->app
.curlen
-= iov
[nextio
].iov_len
;
1144 call
->app
.nFree
-= iov
[nextio
].iov_len
;
1146 if (call
->app
.curlen
== 0) {
1147 if (++call
->app
.curvec
> call
->app
.currentPacket
->niovecs
) {
1148 call
->app
.nFree
= 0;
1151 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_base
;
1153 call
->app
.currentPacket
->wirevec
[call
->app
.curvec
].iov_len
;
1157 } while (nbytes
&& nextio
< nio
);
1159 /* Move the packets from the temporary queue onto the transmit queue.
1160 * We may end up with more than call->twind packets on the queue. */
1162 #ifdef RX_TRACK_PACKETS
1163 for (opr_queue_Scan(&tmpq
, cursor
))
1165 struct rx_packet
*p
= opr_queue_Entry(cursor
, struct rx_packet
, entry
);
1166 p
->flags
|= RX_PKTFLAG_TQ
;
1170 call
->app
.mode
= RX_MODE_ERROR
;
1172 opr_queue_SpliceAppend(&call
->tq
, &tmpq
);
1174 /* If the call is in recovery, let it exhaust its current retransmit
1175 * queue before forcing it to send new packets
1177 if (!(call
->flags
& RX_CALL_FAST_RECOVER
)) {
1181 /* Wait for the length of the transmit queue to fall below call->twind */
1182 while (!call
->error
&& call
->tnext
+ 1 > call
->tfirst
+ (2 * call
->twind
)) {
1184 call
->startWait
= clock_Sec();
1185 #ifdef RX_ENABLE_LOCKS
1186 CV_WAIT(&call
->cv_twind
, &call
->lock
);
1188 call
->flags
|= RX_CALL_WAIT_WINDOW_ALLOC
;
1189 osi_rxSleep(&call
->twind
);
1191 call
->startWait
= 0;
1195 call
->app
.mode
= RX_MODE_ERROR
;
1196 call
->app
.currentPacket
= NULL
;
1197 MUTEX_EXIT(&call
->lock
);
1198 if (call
->app
.currentPacket
) {
1199 #ifdef RX_TRACK_PACKETS
1200 call
->app
.currentPacket
->flags
&= ~RX_PKTFLAG_CP
;
1202 rxi_FreePacket(call
->app
.currentPacket
);
1206 MUTEX_EXIT(&call
->lock
);
1208 return requestCount
- nbytes
;
1212 rx_WritevProc(struct rx_call
*call
, struct iovec
*iov
, int nio
, int nbytes
)
1218 bytes
= rxi_WritevProc(call
, iov
, nio
, nbytes
);
1223 /* Flush any buffered data to the stream, switch to read mode
1224 * (clients) or to EOF mode (servers). If 'locked' is nonzero, call->lock must
1227 * LOCKS HELD: called at netpri.
1230 FlushWrite(struct rx_call
*call
, int locked
)
1232 struct rx_packet
*cp
= NULL
;
1234 /* Free any packets from the last call to ReadvProc/WritevProc */
1235 if (!opr_queue_IsEmpty(&call
->app
.iovq
)) {
1236 #ifdef RXDEBUG_PACKET
1238 #endif /* RXDEBUG_PACKET */
1239 rxi_FreePackets(0, &call
->app
.iovq
);
1242 if (call
->app
.mode
== RX_MODE_SENDING
) {
1245 (call
->conn
->type
==
1246 RX_CLIENT_CONNECTION
? RX_MODE_RECEIVING
: RX_MODE_EOF
);
1248 #ifdef RX_KERNEL_TRACE
1250 int glockOwner
= ISAFS_GLOCK();
1253 afs_Trace3(afs_iclSetp
, CM_TRACE_WASHERE
, ICL_TYPE_STRING
,
1254 __FILE__
, ICL_TYPE_INT32
, __LINE__
, ICL_TYPE_POINTER
,
1262 MUTEX_ENTER(&call
->lock
);
1266 call
->app
.mode
= RX_MODE_ERROR
;
1268 call
->flags
|= RX_CALL_FLUSH
;
1270 cp
= call
->app
.currentPacket
;
1273 /* cp->length is only supposed to be the user's data */
1274 /* cp->length was already set to (then-current)
1275 * MaxUserDataSize or less. */
1276 #ifdef RX_TRACK_PACKETS
1277 cp
->flags
&= ~RX_PKTFLAG_CP
;
1279 cp
->length
-= call
->app
.nFree
;
1280 call
->app
.currentPacket
= NULL
;
1281 call
->app
.nFree
= 0;
1283 cp
= rxi_AllocSendPacket(call
, 0);
1285 /* Mode can no longer be MODE_SENDING */
1289 cp
->niovecs
= 2; /* header + space for rxkad stuff */
1290 call
->app
.nFree
= 0;
1293 /* The 1 specifies that this is the last packet */
1294 call
->app
.bytesSent
+= cp
->length
;
1295 rxi_PrepareSendPacket(call
, cp
, 1);
1296 /* PrepareSendPacket drops the call lock */
1297 rxi_WaitforTQBusy(call
);
1298 #ifdef RX_TRACK_PACKETS
1299 cp
->flags
|= RX_PKTFLAG_TQ
;
1301 opr_queue_Append(&call
->tq
, &cp
->entry
);
1302 #ifdef RXDEBUG_PACKET
1304 #endif /* RXDEBUG_PACKET */
1306 /* If the call is in recovery, let it exhaust its current retransmit
1307 * queue before forcing it to send new packets
1309 if (!(call
->flags
& RX_CALL_FAST_RECOVER
)) {
1313 MUTEX_EXIT(&call
->lock
);
1319 rxi_FlushWrite(struct rx_call
*call
)
1321 FlushWrite(call
, 0);
1325 rxi_FlushWriteLocked(struct rx_call
*call
)
1327 FlushWrite(call
, 1);
1330 /* Flush any buffered data to the stream, switch to read mode
1331 * (clients) or to EOF mode (servers) */
1333 rx_FlushWrite(struct rx_call
*call
)
1337 FlushWrite(call
, 0);