Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / rx / UKERNEL / rx_knet.c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 #include <afsconfig.h>
11 #include "afs/param.h"
12
13
14 #include "rx/rx_kcommon.h"
15 #include "rx_atomic.h"
16 #include "rx_internal.h"
17
18 #define SECONDS_TO_SLEEP 0
19 #define NANO_SECONDS_TO_SLEEP 100000000 /* 100 milliseconds */
20 #define LOOPS_PER_WAITCHECK 10 /* once per second */
21
22 unsigned short usr_rx_port = 0;
23
24 struct usr_ifnet *usr_ifnet = NULL;
25 struct usr_in_ifaddr *usr_in_ifaddr = NULL;
26
27 void rxk_InitializeSocket(void);
28 extern int afs_osi_CheckTimedWaits(void);
29
30 void
31 afs_rxevent_daemon(void)
32 {
33 struct timespec tv;
34 struct clock temp;
35 int i = 0;
36
37 AFS_GUNLOCK();
38 while (1) {
39 tv.tv_sec = SECONDS_TO_SLEEP;
40 tv.tv_nsec = NANO_SECONDS_TO_SLEEP;
41 usr_thread_sleep(&tv);
42 /*
43 * Check for shutdown, don't try to stop the listener
44 */
45 if (afs_termState == AFSOP_STOP_RXEVENT
46 || afs_termState == AFSOP_STOP_RXK_LISTENER) {
47 AFS_GLOCK();
48 afs_termState = AFSOP_STOP_COMPLETE;
49 afs_osi_Wakeup(&afs_termState);
50 return;
51 }
52 rxevent_RaiseEvents(&temp);
53 if (++i >= LOOPS_PER_WAITCHECK) {
54 i = 0;
55 afs_osi_CheckTimedWaits();
56 }
57 }
58 }
59
60
61 /* Loop to listen on a socket. Return setting *newcallp if this
62 * thread should become a server thread. */
63 void
64 rxi_ListenerProc(osi_socket usockp, int *tnop, struct rx_call **newcallp)
65 {
66 struct rx_packet *tp;
67 afs_uint32 host;
68 u_short port;
69 int rc;
70
71 /*
72 * Use the rxk_GetPacketProc and rxk_PacketArrivalProc routines
73 * to allocate rx_packet buffers and pass them to the RX layer
74 * for processing.
75 */
76 while (1) {
77 /* See if a check for additional packets was issued */
78 rx_CheckPackets();
79
80 tp = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE);
81 usr_assert(tp != NULL);
82 rc = rxi_ReadPacket(usockp, tp, &host, &port);
83 if (rc != 0) {
84 tp = rxi_ReceivePacket(tp, usockp, host, port, tnop, newcallp);
85 if (newcallp && *newcallp) {
86 if (tp) {
87 rxi_FreePacket(tp);
88 }
89 return;
90 }
91 }
92 if (tp) {
93 rxi_FreePacket(tp);
94 }
95 if (afs_termState == AFSOP_STOP_RXEVENT) {
96 afs_termState = AFSOP_STOP_RXK_LISTENER;
97 afs_osi_Wakeup(&afs_termState);
98 }
99 }
100 }
101
102 /* This is the listener process request loop. The listener process loop
103 * becomes a server thread when rxi_ListenerProc returns, and stays
104 * server thread until rxi_ServerProc returns. */
105 void
106 rxk_Listener(void)
107 {
108 int threadID;
109 osi_socket sock = (osi_socket) rx_socket;
110 struct rx_call *newcall;
111 struct usr_socket *usockp;
112
113 /*
114 * Initialize the rx_socket and start the receiver threads
115 */
116 rxk_InitializeSocket();
117
118 usockp = (struct usr_socket *)rx_socket;
119 assert(usockp != NULL);
120
121 AFS_GUNLOCK();
122 while (1) {
123 newcall = NULL;
124 threadID = -1;
125 rxi_ListenerProc(sock, &threadID, &newcall);
126 /* assert(threadID != -1); */
127 /* assert(newcall != NULL); */
128 sock = OSI_NULLSOCKET;
129 rxi_ServerProc(threadID, newcall, &sock);
130 if (sock == OSI_NULLSOCKET) {
131 break;
132 }
133 }
134 AFS_GLOCK();
135 }
136
137 /* This is the server process request loop. The server process loop
138 * becomes a listener thread when rxi_ServerProc returns, and stays
139 * listener thread until rxi_ListenerProc returns. */
140 void *
141 rx_ServerProc(void *unused)
142 {
143 osi_socket sock;
144 int threadID;
145 struct rx_call *newcall = NULL;
146
147 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
148 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
149 /* threadID is used for making decisions in GetCall. Get it by bumping
150 * number of threads handling incoming calls */
151 threadID = rxi_availProcs++;
152
153 AFS_GUNLOCK();
154 while (1) {
155 sock = OSI_NULLSOCKET;
156 rxi_ServerProc(threadID, newcall, &sock);
157 if (sock == OSI_NULLSOCKET) {
158 break;
159 }
160 newcall = NULL;
161 threadID = -1;
162 rxi_ListenerProc(sock, &threadID, &newcall);
163 /* assert(threadID != -1); */
164 /* assert(newcall != NULL); */
165 }
166 AFS_GLOCK();
167 return NULL;
168 }
169
170 /*
171 * At this point, RX wants a socket, but still has not initialized the
172 * rx_port variable or the pointers to the packet allocater and arrival
173 * routines. Allocate the socket buffer here, but don't open it until
174 * we start the receiver threads.
175 */
176 osi_socket *
177 rxk_NewSocketHost(afs_uint32 ahost, short aport)
178 {
179 struct usr_socket *usockp;
180
181 usockp = afs_osi_Alloc(sizeof(struct usr_socket));
182 usr_assert(usockp != NULL);
183
184 usockp->sock = -1;
185
186 return (osi_socket *)usockp;
187 }
188
189 osi_socket *
190 rxk_NewSocket(short aport)
191 {
192 return rxk_NewSocketHost(htonl(INADDR_ANY), aport);
193 }
194
195 /*
196 * This routine is called from rxk_Listener. By this time rx_port
197 * is set to 7001 and rx_socket points to the socket buffer
198 * we allocated in rxk_NewSocket. Now is the time to bind our
199 * socket and start the receiver threads.
200 */
201 void
202 rxk_InitializeSocket(void)
203 {
204 int rc, sock;
205 #ifdef AFS_USR_AIX_ENV
206 unsigned long len, optval, optval0, optlen;
207 #else /* AFS_USR_AIX_ENV */
208 socklen_t len, optlen;
209 int optval, optval0;
210 #endif /* AFS_USR_AIX_ENV */
211 struct usr_socket *usockp;
212 struct sockaddr_in lcladdr;
213
214 usr_assert(rx_socket != NULL);
215 usockp = (struct usr_socket *)rx_socket;
216
217 #undef socket
218 sock = socket(PF_INET, SOCK_DGRAM, 0);
219 usr_assert(sock >= 0);
220
221 memset((void *)&lcladdr, 0, sizeof(struct sockaddr_in));
222 lcladdr.sin_family = AF_INET;
223 lcladdr.sin_port = htons(usr_rx_port);
224 lcladdr.sin_addr.s_addr = INADDR_ANY;
225 rc = bind(sock, (struct sockaddr *)&lcladdr, sizeof(struct sockaddr_in));
226 usr_assert(rc >= 0);
227 len = sizeof(struct sockaddr_in);
228 rc = getsockname(sock, (struct sockaddr *)&lcladdr, &len);
229 usr_assert(rc >= 0);
230 #ifdef AFS_USR_LINUX22_ENV
231 optval0 = 131070;
232 #else
233 optval0 = 131072;
234 #endif
235 optval = optval0;
236 rc = setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval,
237 sizeof(optval));
238 usr_assert(rc == 0);
239 optlen = sizeof(optval);
240 rc = getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (void *)&optval, &optlen);
241 usr_assert(rc == 0);
242 /* usr_assert(optval == optval0); */
243 #ifdef AFS_USR_LINUX22_ENV
244 optval0 = 131070;
245 #else
246 optval0 = 131072;
247 #endif
248 optval = optval0;
249 rc = setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval,
250 sizeof(optval));
251 usr_assert(rc == 0);
252 optlen = sizeof(optval);
253 rc = getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (void *)&optval, &optlen);
254 usr_assert(rc == 0);
255 /* usr_assert(optval == optval0); */
256
257 #ifdef AFS_USR_AIX_ENV
258 optval = 1;
259 rc = setsockopt(sock, SOL_SOCKET, SO_CKSUMRECV, (void *)&optval,
260 sizeof(optval));
261 usr_assert(rc == 0);
262 #endif /* AFS_USR_AIX_ENV */
263
264 #ifdef FD_CLOEXEC
265 fcntl(sock, F_SETFD, FD_CLOEXEC);
266 #endif
267
268 usockp->sock = sock;
269 usockp->port = lcladdr.sin_port;
270
271 /*
272 * Set the value of rx_port to reflect the address we actually
273 * are listening on, since the kernel is probably already using 7001.
274 */
275 rx_port = usockp->port;
276 }
277
278 int
279 rxk_FreeSocket(struct usr_socket *sockp)
280 {
281 return 0;
282 }
283
284 void
285 osi_StopListener(void)
286 {
287 rxk_FreeSocket((struct usr_socket *)rx_socket);
288 }
289
290 int
291 osi_NetSend(osi_socket sockp, struct sockaddr_in *addr, struct iovec *iov,
292 int nio, afs_int32 size, int stack)
293 {
294 int rc;
295 int i;
296 struct usr_socket *usockp = (struct usr_socket *)sockp;
297 struct msghdr msg;
298 struct iovec tmpiov[64];
299
300 /*
301 * The header is in the first iovec
302 */
303 usr_assert(nio > 0 && nio <= 64);
304 for (i = 0; i < nio; i++) {
305 tmpiov[i].iov_base = iov[i].iov_base;
306 tmpiov[i].iov_len = iov[i].iov_len;
307 }
308
309 memset(&msg, 0, sizeof(msg));
310 msg.msg_name = (void *)addr;
311 msg.msg_namelen = sizeof(struct sockaddr_in);
312 msg.msg_iov = &tmpiov[0];
313 msg.msg_iovlen = nio;
314
315 rc = sendmsg(usockp->sock, &msg, 0);
316 if (rc < 0) {
317 return errno;
318 }
319 usr_assert(rc == size);
320
321 return 0;
322 }
323
324 void
325 shutdown_rxkernel(void)
326 {
327 rxk_initDone = 0;
328 rxk_shutdownPorts();
329 }
330
331 void
332 rx_Finalize(void)
333 {
334 usr_assert(0);
335 }
336
337 /*
338 * Recvmsg.
339 *
340 */
341 int
342 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
343 {
344 int ret;
345 do {
346 ret = recvmsg(socket->sock, msg_p, flags);
347 } while (ret == -1 && errno == EAGAIN);
348 return ret;
349 }