Import Upstream version 1.8.5
[hcoop/debian/openafs.git] / src / vol / daemon_com.c
1 /*
2 * Copyright 2006-2008, Sine Nomine Associates and others.
3 * All Rights Reserved.
4 *
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
9
10 /*
11 * localhost interprocess communication for servers
12 *
13 * currently handled by a localhost socket
14 * (yes, this needs to be replaced someday)
15 */
16
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
20
21 #include <afsconfig.h>
22 #include <afs/param.h>
23
24 #include <roken.h>
25 #include <afs/opr.h>
26
27 #include <rx/xdr.h>
28 #include <afs/afsint.h>
29 #include <afs/errors.h>
30 #include <rx/rx_queue.h>
31
32 #include "nfs.h"
33 #include "daemon_com.h"
34 #include "lwp.h"
35 #include "lock.h"
36 #include <afs/afssyscalls.h>
37 #include "ihandle.h"
38 #include "vnode.h"
39 #include "volume.h"
40 #include "partition.h"
41 #include "common.h"
42 #include <rx/rx_queue.h>
43
44 #ifdef USE_UNIX_SOCKETS
45 #include <afs/afsutil.h>
46 #include <sys/un.h>
47 #endif
48
49 int (*V_BreakVolumeCallbacks) (VolumeId);
50
51 #define MAXHANDLERS 4 /* Up to 4 clients; must be at least 2, so that
52 * move = dump+restore can run on single server */
53
54 #define MAX_BIND_TRIES 5 /* Number of times to retry socket bind */
55
56 static int SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res);
57
58
59 /*
60 * On AIX, connect() and bind() require use of SUN_LEN() macro;
61 * sizeof(struct sockaddr_un) will not suffice.
62 */
63 #if defined(AFS_AIX_ENV) && defined(USE_UNIX_SOCKETS)
64 #define AFS_SOCKADDR_LEN(sa) SUN_LEN(sa)
65 #else
66 #define AFS_SOCKADDR_LEN(sa) sizeof(*sa)
67 #endif
68
69
70 /* daemon com SYNC general interfaces */
71
72 /**
73 * fill in sockaddr structure.
74 *
75 * @param[in] endpoint pointer to sync endpoint object
76 * @param[out] addr pointer to sockaddr structure
77 *
78 * @post sockaddr structure populated using information from
79 * endpoint structure.
80 */
81 void
82 SYNC_getAddr(SYNC_endpoint_t * endpoint, SYNC_sockaddr_t * addr)
83 {
84 memset(addr, 0, sizeof(*addr));
85
86 #ifdef USE_UNIX_SOCKETS
87 addr->sun_family = AF_UNIX;
88 snprintf(addr->sun_path, sizeof(addr->sun_path), "%s/%s",
89 AFSDIR_SERVER_LOCAL_DIRPATH, endpoint->un);
90 addr->sun_path[sizeof(addr->sun_path) - 1] = '\0';
91 #else /* !USE_UNIX_SOCKETS */
92 #ifdef STRUCT_SOCKADDR_HAS_SA_LEN
93 addr->sin_len = sizeof(struct sockaddr_in);
94 #endif
95 addr->sin_addr.s_addr = htonl(0x7f000001);
96 addr->sin_family = AF_INET; /* was localhost->h_addrtype */
97 addr->sin_port = htons(endpoint->in); /* XXXX htons not _really_ neccessary */
98 #endif /* !USE_UNIX_SOCKETS */
99 }
100
101 /**
102 * get a socket descriptor of the appropriate domain.
103 *
104 * @param[in] endpoint pointer to sync endpoint object
105 *
106 * @return socket descriptor
107 *
108 * @post socket of domain specified in endpoint structure is created and
109 * returned to caller.
110 */
111 osi_socket
112 SYNC_getSock(SYNC_endpoint_t * endpoint)
113 {
114 osi_socket sd;
115 opr_Verify((sd = socket(endpoint->domain, SOCK_STREAM, 0)) >= 0);
116 return sd;
117 }
118
119 /* daemon com SYNC client interface */
120
121 /**
122 * open a client connection to a sync server
123 *
124 * @param[in] state pointer to sync client handle
125 *
126 * @return operation status
127 * @retval 1 success
128 *
129 * @note at present, this routine aborts rather than returning an error code
130 */
131 int
132 SYNC_connect(SYNC_client_state * state)
133 {
134 SYNC_sockaddr_t addr;
135 /* I can't believe the following is needed for localhost connections!! */
136 static time_t backoff[] =
137 { 3, 3, 3, 5, 5, 5, 7, 15, 16, 24, 32, 40, 48, 0 };
138 time_t *timeout = &backoff[0];
139
140 if (state->fd != OSI_NULLSOCKET) {
141 return 1;
142 }
143
144 SYNC_getAddr(&state->endpoint, &addr);
145
146 for (;;) {
147 state->fd = SYNC_getSock(&state->endpoint);
148 if (connect(state->fd, (struct sockaddr *)&addr, AFS_SOCKADDR_LEN(&addr)) >= 0)
149 return 1;
150 if (!*timeout)
151 break;
152 if (!(*timeout & 1))
153 Log("SYNC_connect: temporary failure on circuit '%s' (will retry)\n",
154 state->proto_name);
155 SYNC_disconnect(state);
156 sleep(*timeout++);
157 }
158 perror("SYNC_connect failed (giving up!)");
159 return 0;
160 }
161
162 /**
163 * forcibly disconnect a sync client handle.
164 *
165 * @param[in] state pointer to sync client handle
166 *
167 * @retval operation status
168 * @retval 0 success
169 */
170 int
171 SYNC_disconnect(SYNC_client_state * state)
172 {
173 rk_closesocket(state->fd);
174 state->fd = OSI_NULLSOCKET;
175 return 0;
176 }
177
178 /**
179 * gracefully disconnect a sync client handle.
180 *
181 * @param[in] state pointer to sync client handle
182 *
183 * @return operation status
184 * @retval SYNC_OK success
185 */
186 afs_int32
187 SYNC_closeChannel(SYNC_client_state * state)
188 {
189 SYNC_command com;
190 SYNC_response res;
191 SYNC_PROTO_BUF_DECL(ores);
192
193 if (state->fd == OSI_NULLSOCKET)
194 return SYNC_OK;
195
196 memset(&com, 0, sizeof(com));
197 memset(&res, 0, sizeof(res));
198
199 res.payload.len = SYNC_PROTO_MAX_LEN;
200 res.payload.buf = ores;
201
202 com.hdr.command = SYNC_COM_CHANNEL_CLOSE;
203 com.hdr.command_len = sizeof(SYNC_command_hdr);
204 com.hdr.flags |= SYNC_FLAG_CHANNEL_SHUTDOWN;
205
206 /* in case the other end dropped, don't do any retries */
207 state->retry_limit = 0;
208 state->hard_timeout = 0;
209
210 SYNC_ask(state, &com, &res);
211 SYNC_disconnect(state);
212
213 return SYNC_OK;
214 }
215
216 /**
217 * forcibly break a client connection, and then create a new connection.
218 *
219 * @param[in] state pointer to sync client handle
220 *
221 * @post old connection dropped; new connection established
222 *
223 * @return @see SYNC_connect()
224 */
225 int
226 SYNC_reconnect(SYNC_client_state * state)
227 {
228 SYNC_disconnect(state);
229 return SYNC_connect(state);
230 }
231
232 /**
233 * send a command to a sync server and wait for a response.
234 *
235 * @param[in] state pointer to sync client handle
236 * @param[in] com command object
237 * @param[out] res response object
238 *
239 * @return operation status
240 * @retval SYNC_OK success
241 * @retval SYNC_COM_ERROR communications error
242 * @retval SYNC_BAD_COMMAND server did not recognize command code
243 *
244 * @note this routine merely handles error processing; SYNC_ask_internal()
245 * handles the low-level details of communicating with the SYNC server.
246 *
247 * @see SYNC_ask_internal
248 */
249 afs_int32
250 SYNC_ask(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
251 {
252 int tries;
253 afs_uint32 now, timeout, code=SYNC_OK;
254
255 if (state->fd == OSI_NULLSOCKET) {
256 SYNC_connect(state);
257 }
258
259 if (state->fd == OSI_NULLSOCKET) {
260 return SYNC_COM_ERROR;
261 }
262
263 #ifdef AFS_DEMAND_ATTACH_FS
264 com->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
265 #endif
266
267 now = FT_ApproxTime();
268 timeout = now + state->hard_timeout;
269 for (tries = 0;
270 (tries <= state->retry_limit) && (now <= timeout);
271 tries++, now = FT_ApproxTime()) {
272 code = SYNC_ask_internal(state, com, res);
273 if (code == SYNC_OK) {
274 break;
275 } else if (code == SYNC_BAD_COMMAND) {
276 Log("SYNC_ask: protocol mismatch on circuit '%s'; make sure "
277 "fileserver, volserver, salvageserver and salvager are same "
278 "version\n", state->proto_name);
279 break;
280 } else if ((code == SYNC_COM_ERROR) && (tries < state->retry_limit)) {
281 Log("SYNC_ask: protocol communications failure on circuit '%s'; "
282 "attempting reconnect to server\n", state->proto_name);
283 SYNC_reconnect(state);
284 /* try again */
285 } else {
286 /*
287 * unknown (probably protocol-specific) response code, pass it up to
288 * the caller, and let them deal with it
289 */
290 break;
291 }
292 }
293
294 if (code == SYNC_COM_ERROR) {
295 Log("SYNC_ask: too many / too latent fatal protocol errors on circuit "
296 "'%s'; giving up (tries %d timeout %d)\n",
297 state->proto_name, tries, timeout);
298 }
299
300 return code;
301 }
302
303 /**
304 * send a command to a sync server and wait for a response.
305 *
306 * @param[in] state pointer to sync client handle
307 * @param[in] com command object
308 * @param[out] res response object
309 *
310 * @return operation status
311 * @retval SYNC_OK success
312 * @retval SYNC_COM_ERROR communications error
313 *
314 * @internal
315 */
316 static afs_int32
317 SYNC_ask_internal(SYNC_client_state * state, SYNC_command * com, SYNC_response * res)
318 {
319 int n;
320 SYNC_PROTO_BUF_DECL(buf);
321 #ifndef AFS_NT40_ENV
322 int iovcnt;
323 struct iovec iov[2];
324 #endif
325
326 if (state->fd == OSI_NULLSOCKET) {
327 Log("SYNC_ask: invalid sync file descriptor on circuit '%s'\n",
328 state->proto_name);
329 res->hdr.response = SYNC_COM_ERROR;
330 goto done;
331 }
332
333 if (com->hdr.command_len > SYNC_PROTO_MAX_LEN) {
334 Log("SYNC_ask: internal SYNC buffer too small on circuit '%s'; "
335 "please file a bug\n", state->proto_name);
336 res->hdr.response = SYNC_COM_ERROR;
337 goto done;
338 }
339
340 /*
341 * fill in some common header fields
342 */
343 com->hdr.proto_version = state->proto_version;
344 com->hdr.pkt_seq = ++state->pkt_seq;
345 com->hdr.com_seq = ++state->com_seq;
346 #ifdef AFS_NT40_ENV
347 com->hdr.pid = 0;
348 com->hdr.tid = 0;
349 #else
350 com->hdr.pid = getpid();
351 #ifdef AFS_PTHREAD_ENV
352 com->hdr.tid = afs_pointer_to_int(pthread_self());
353 #else
354 {
355 PROCESS handle = LWP_ThreadId();
356 com->hdr.tid = (handle) ? handle->index : 0;
357 }
358 #endif /* !AFS_PTHREAD_ENV */
359 #endif /* !AFS_NT40_ENV */
360
361 memcpy(buf, &com->hdr, sizeof(com->hdr));
362 if (com->payload.len) {
363 memcpy(buf + sizeof(com->hdr), com->payload.buf,
364 com->hdr.command_len - sizeof(com->hdr));
365 }
366
367 #ifdef AFS_NT40_ENV
368 n = send(state->fd, buf, com->hdr.command_len, 0);
369 if (n != com->hdr.command_len) {
370 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
371 res->hdr.response = SYNC_COM_ERROR;
372 goto done;
373 }
374
375 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
376 /* short circuit close channel requests */
377 res->hdr.response = SYNC_OK;
378 goto done;
379 }
380
381 n = recv(state->fd, buf, SYNC_PROTO_MAX_LEN, 0);
382 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
383 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
384 res->hdr.response = SYNC_COM_ERROR;
385 goto done;
386 }
387 #else /* !AFS_NT40_ENV */
388 n = write(state->fd, buf, com->hdr.command_len);
389 if (com->hdr.command_len != n) {
390 Log("SYNC_ask: write failed on circuit '%s'\n", state->proto_name);
391 res->hdr.response = SYNC_COM_ERROR;
392 goto done;
393 }
394
395 if (com->hdr.command == SYNC_COM_CHANNEL_CLOSE) {
396 /* short circuit close channel requests */
397 res->hdr.response = SYNC_OK;
398 goto done;
399 }
400
401 /* receive the response */
402 iov[0].iov_base = (char *)&res->hdr;
403 iov[0].iov_len = sizeof(res->hdr);
404 if (res->payload.len) {
405 iov[1].iov_base = (char *)res->payload.buf;
406 iov[1].iov_len = res->payload.len;
407 iovcnt = 2;
408 } else {
409 iovcnt = 1;
410 }
411 n = readv(state->fd, iov, iovcnt);
412 if (n == 0 || (n < 0 && errno != EINTR)) {
413 Log("SYNC_ask: No response on circuit '%s'\n", state->proto_name);
414 res->hdr.response = SYNC_COM_ERROR;
415 goto done;
416 }
417 #endif /* !AFS_NT40_ENV */
418
419 res->recv_len = n;
420
421 if (n < sizeof(res->hdr)) {
422 Log("SYNC_ask: response too short on circuit '%s'\n",
423 state->proto_name);
424 res->hdr.response = SYNC_COM_ERROR;
425 goto done;
426 }
427 #ifdef AFS_NT40_ENV
428 memcpy(&res->hdr, buf, sizeof(res->hdr));
429 #endif
430
431 if ((n - sizeof(res->hdr)) > res->payload.len) {
432 Log("SYNC_ask: response too long on circuit '%s'\n",
433 state->proto_name);
434 res->hdr.response = SYNC_COM_ERROR;
435 goto done;
436 }
437 #ifdef AFS_NT40_ENV
438 memcpy(res->payload.buf, buf + sizeof(res->hdr), n - sizeof(res->hdr));
439 #endif
440
441 if (res->hdr.response_len != n) {
442 Log("SYNC_ask: length field in response inconsistent "
443 "on circuit '%s' command %ld, %d != %lu\n", state->proto_name,
444 afs_printable_int32_ld(com->hdr.command),
445 n,
446 afs_printable_uint32_lu(res->hdr.response_len));
447 res->hdr.response = SYNC_COM_ERROR;
448 goto done;
449 }
450 if (res->hdr.response == SYNC_DENIED) {
451 Log("SYNC_ask: negative response on circuit '%s'\n", state->proto_name);
452 }
453
454 done:
455 return res->hdr.response;
456 }
457
458
459 /*
460 * daemon com SYNC server-side interfaces
461 */
462
463 /**
464 * receive a command structure off a sync socket.
465 *
466 * @param[in] state pointer to server-side state object
467 * @param[in] fd file descriptor on which to perform i/o
468 * @param[out] com sync command object to be populated
469 *
470 * @return operation status
471 * @retval SYNC_OK command received
472 * @retval SYNC_COM_ERROR there was a socket communications error
473 */
474 afs_int32
475 SYNC_getCom(SYNC_server_state_t * state,
476 osi_socket fd,
477 SYNC_command * com)
478 {
479 int n;
480 afs_int32 code = SYNC_OK;
481 #ifdef AFS_NT40_ENV
482 SYNC_PROTO_BUF_DECL(buf);
483 #else
484 struct iovec iov[2];
485 int iovcnt;
486 #endif
487
488 #ifdef AFS_NT40_ENV
489 n = recv(fd, buf, SYNC_PROTO_MAX_LEN, 0);
490
491 if (n == 0 || (n < 0 && WSAEINTR != WSAGetLastError())) {
492 Log("SYNC_getCom: error receiving command\n");
493 code = SYNC_COM_ERROR;
494 goto done;
495 }
496 #else /* !AFS_NT40_ENV */
497 iov[0].iov_base = (char *)&com->hdr;
498 iov[0].iov_len = sizeof(com->hdr);
499 if (com->payload.len) {
500 iov[1].iov_base = (char *)com->payload.buf;
501 iov[1].iov_len = com->payload.len;
502 iovcnt = 2;
503 } else {
504 iovcnt = 1;
505 }
506
507 n = readv(fd, iov, iovcnt);
508 if (n == 0 || (n < 0 && errno != EINTR)) {
509 Log("SYNC_getCom: error receiving command\n");
510 code = SYNC_COM_ERROR;
511 goto done;
512 }
513 #endif /* !AFS_NT40_ENV */
514
515 com->recv_len = n;
516
517 if (n < sizeof(com->hdr)) {
518 Log("SYNC_getCom: command too short\n");
519 code = SYNC_COM_ERROR;
520 goto done;
521 }
522 #ifdef AFS_NT40_ENV
523 memcpy(&com->hdr, buf, sizeof(com->hdr));
524 #endif
525
526 if ((n - sizeof(com->hdr)) > com->payload.len) {
527 Log("SYNC_getCom: command too long\n");
528 code = SYNC_COM_ERROR;
529 goto done;
530 }
531 #ifdef AFS_NT40_ENV
532 memcpy(com->payload.buf, buf + sizeof(com->hdr), n - sizeof(com->hdr));
533 #endif
534
535 done:
536 return code;
537 }
538
539 /**
540 * write a response structure to a sync socket.
541 *
542 * @param[in] state handle to server-side state object
543 * @param[in] fd file descriptor on which to perform i/o
544 * @param[in] res handle to response packet
545 *
546 * @return operation status
547 * @retval SYNC_OK
548 * @retval SYNC_COM_ERROR
549 */
550 afs_int32
551 SYNC_putRes(SYNC_server_state_t * state,
552 osi_socket fd,
553 SYNC_response * res)
554 {
555 int n;
556 afs_int32 code = SYNC_OK;
557 SYNC_PROTO_BUF_DECL(buf);
558
559 if (res->hdr.response_len > (sizeof(res->hdr) + res->payload.len)) {
560 Log("SYNC_putRes: response_len field in response header inconsistent\n");
561 code = SYNC_COM_ERROR;
562 goto done;
563 }
564
565 if (res->hdr.response_len > SYNC_PROTO_MAX_LEN) {
566 Log("SYNC_putRes: internal SYNC buffer too small; please file a bug\n");
567 code = SYNC_COM_ERROR;
568 goto done;
569 }
570
571 #ifdef AFS_DEMAND_ATTACH_FS
572 res->hdr.flags |= SYNC_FLAG_DAFS_EXTENSIONS;
573 #endif
574 res->hdr.proto_version = state->proto_version;
575 res->hdr.pkt_seq = ++state->pkt_seq;
576 res->hdr.res_seq = ++state->res_seq;
577
578 memcpy(buf, &res->hdr, sizeof(res->hdr));
579 if (res->payload.len) {
580 memcpy(buf + sizeof(res->hdr), res->payload.buf,
581 res->hdr.response_len - sizeof(res->hdr));
582 }
583
584 #ifdef AFS_NT40_ENV
585 n = send(fd, buf, res->hdr.response_len, 0);
586 #else /* !AFS_NT40_ENV */
587 n = write(fd, buf, res->hdr.response_len);
588 #endif /* !AFS_NT40_ENV */
589
590 if (res->hdr.response_len != n) {
591 Log("SYNC_putRes: write failed\n");
592 res->hdr.response = SYNC_COM_ERROR;
593 goto done;
594 }
595
596 done:
597 return code;
598 }
599
600 /* return 0 for legal (null-terminated) string,
601 * 1 for illegal (unterminated) string */
602 int
603 SYNC_verifyProtocolString(char * buf, size_t len)
604 {
605 size_t s_len;
606
607 s_len = strnlen(buf, len);
608
609 return (s_len == len) ? 1 : 0;
610 }
611
612 /**
613 * clean up old sockets.
614 *
615 * @param[in] state server state object
616 *
617 * @post unix domain sockets are cleaned up
618 */
619 void
620 SYNC_cleanupSock(SYNC_server_state_t * state)
621 {
622 #ifdef USE_UNIX_SOCKETS
623 remove(state->addr.sun_path);
624 #endif
625 }
626
627 /**
628 * bind socket and set it to listen state.
629 *
630 * @param[in] state server state object
631 *
632 * @return operation status
633 * @retval 0 success
634 * @retval nonzero failure
635 *
636 * @post socket bound and set to listen state
637 */
638 int
639 SYNC_bindSock(SYNC_server_state_t * state)
640 {
641 int code;
642 int on = 1;
643 int numTries;
644
645 /* Reuseaddr needed because system inexplicably leaves crud lying around */
646 code =
647 setsockopt(state->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on,
648 sizeof(on));
649 if (code)
650 Log("SYNC_bindSock: setsockopt failed with (%d)\n", errno);
651
652 for (numTries = 0; numTries < state->bind_retry_limit; numTries++) {
653 code = bind(state->fd,
654 (struct sockaddr *)&state->addr,
655 AFS_SOCKADDR_LEN(&state->addr));
656 if (code == 0)
657 break;
658 Log("SYNC_bindSock: bind failed with (%d), will sleep and retry\n",
659 errno);
660 sleep(5);
661 }
662 listen(state->fd, state->listen_depth);
663
664 return code;
665 }