(web server) punts keep-alive to impls; http server uses (ice-9 poll)
authorAndy Wingo <wingo@pobox.com>
Fri, 3 Dec 2010 14:31:57 +0000 (15:31 +0100)
committerAndy Wingo <wingo@pobox.com>
Fri, 3 Dec 2010 14:31:57 +0000 (15:31 +0100)
* module/web/server.scm: Rewrite to remove the extra "keep-alive"
  parameter. Instead, since the server is an essentially stateful
  object, have clients that want to do keep-alive manage that set as
  part of the server state. Also avoids imposing a particular data
  structure on the server implementation.

* module/web/server/http.scm: Adapt to the new server interface. Also,
  use a poll set instead of select and lists. Makes handling 1000
  clients at a time much more possible.

module/web/server.scm
module/web/server/http.scm

index 8fd63c8..3d7c411 100644 (file)
 ;;;     server socket object, or signals an error.
 ;;;
 ;;;   * The `read' hook is called, to read a request from a new client.
-;;;     The `read' hook takes two arguments: the server socket, and a
-;;;     list of keep-alive clients.  It should return four values:  the
-;;;     new list of keep-alive clients, an opaque client socket, the
+;;;     The `read' hook takes one arguments, the server socket.  It
+;;;     should return three values: an opaque client socket, the
 ;;;     request, and the request body. The request should be a
 ;;;     `<request>' object, from `(web request)'.  The body should be a
 ;;;     string or a bytevector, or `#f' if there is no body.
 ;;;
-;;;     The keep-alive list is used when selecting a new request.  You
-;;;     can either serve an old client or serve a new client; and some
-;;;     old clients might close their connections while you are waiting.
-;;;     The `read' hook returns a new keep-alive set to account for old
-;;;     clients going away, and for read errors on old clients.
-;;;
 ;;;     If the read failed, the `read' hook may return #f for the client
 ;;;     socket, request, and body.
 ;;;
 ;;;     constructed with those headers.
 ;;;
 ;;;   * The `write' hook is called with three arguments: the client
-;;;     socket, the response, and the body.  The `write' hook may return
-;;;     #f to indicate that the connection was closed.  If `write'
-;;;     returns a true value, it will be consed onto the keep-alive
-;;;     list.
+;;;     socket, the response, and the body.  The `write' hook returns no
+;;;     values.
 ;;;
 ;;;   * At this point the request handling is complete. For a loop, we
-;;;     loop back with the new keep-alive list, and try to read a new
-;;;     request.
+;;;     loop back and try to read a new request.
 ;;;
 ;;;   * If the user interrupts the loop, the `close' hook is called on
 ;;;     the server socket.
 (define (open-server impl open-params)
   (apply (server-impl-open impl) open-params))
 
-;; -> (keep-alive client request body | keep-alive #f #f #f)
-(define (read-client impl server keep-alive)
+;; -> (client request body | #f #f #f)
+(define (read-client impl server)
   (call-with-error-handling
    (lambda ()
-     ((server-impl-read impl) server keep-alive))
+     ((server-impl-read impl) server))
    #:pass-keys '(quit interrupt)
    #:on-error (if (batch-mode?) 'pass 'debug)
    #:post-error
    (lambda (k . args)
      (warn "Error while accepting client" k args)
-     (values keep-alive #f #f #f))))
+     (values #f #f #f))))
 
 (define (call-with-encoded-output-string charset proc)
   (if (and (string-ci=? charset "utf-8") #f)
      (warn "Error handling request" k args)
      (values (build-response #:code 500) #f state))))
 
-;; -> (#f | client)
+;; -> unspecified values
 (define (write-client impl server client response body)
   (call-with-error-handling
    (lambda ()
    #:post-error
    (lambda (k . args)
      (warn "Error while writing response" k args)
-     #f)))
+     (values))))
 
 ;; -> unspecified values
 (define (close-server impl server)
                     (lambda (k proc)
                       (with-stack-and-prompt (lambda () (proc k))))))
   
-(define (and-cons x xs)
-  (if x (cons x xs) xs))
-
-;; -> new keep-alive new-state
-(define (serve-one-client handler impl server keep-alive state)
+;; -> new-state
+(define (serve-one-client handler impl server state)
   (debug-elapsed 'serve-again)
   (call-with-values
       (lambda ()
-        (read-client impl server keep-alive))
-    (lambda (keep-alive client request body)
+        (read-client impl server))
+    (lambda (client request body)
       (debug-elapsed 'read-client)
       (if client
           (call-with-values
                 (handle-request handler request body state))
             (lambda (response body state)
               (debug-elapsed 'handle-request)
-              (values
-               (and-cons (let ((x (write-client impl server client response body)))
-                           (debug-elapsed 'write-client)
-                           x)
-                         keep-alive)
-               state)))
-          (values keep-alive state)))))
+              (write-client impl server client response body)
+              (debug-elapsed 'write-client)
+              state))
+          state))))
 
 (define* (run-server handler #:optional (impl 'http) (open-params '())
                      . state)
          (server (open-server impl open-params)))
     (call-with-sigint
      (lambda ()
-       (let lp ((keep-alive '()) (state state))
-         (call-with-values
-             (lambda ()
-               (serve-one-client handler impl server keep-alive state))
-           (lambda (new-keep-alive new-state)
-             (lp new-keep-alive new-state)))))
+       (let lp ((state state))
+         (lp (serve-one-client handler impl server state))))
      (lambda ()
        (close-server impl server)
        (values)))))
index 6ec414b..1628e1d 100644 (file)
 
 (define-module (web server http)
   #:use-module ((srfi srfi-1) #:select (fold))
+  #:use-module (srfi srfi-9)
   #:use-module (rnrs bytevectors)
   #:use-module (web request)
   #:use-module (web response)
   #:use-module (web server)
+  #:use-module (ice-9 poll)
   #:use-module (system repl error-handling))
 
 
     (bind sock family addr port)
     sock))
 
+(define-record-type <http-server>
+  (make-http-server socket poll-idx poll-set)
+  http-server?
+  (socket http-socket)
+  (poll-idx http-poll-idx set-http-poll-idx!)
+  (poll-set http-poll-set))
+
+(define *error-events* (logior POLLHUP POLLERR))
+(define *read-events* POLLIN)
+(define *events* (logior *error-events* *read-events*))
+
 ;; -> server
 (define* (http-open #:key
-                      (host #f)
-                      (family AF_INET)
-                      (addr (if host
-                                (inet-pton family host)
-                                INADDR_LOOPBACK))
-                      (port 8080)
-                      (socket (make-default-socket family addr port)))
+                    (host #f)
+                    (family AF_INET)
+                    (addr (if host
+                              (inet-pton family host)
+                              INADDR_LOOPBACK))
+                    (port 8080)
+                    (socket (make-default-socket family addr port)))
   (listen socket 5)
   (sigaction SIGPIPE SIG_IGN)
-  socket)
+  (let ((poll-set (make-empty-poll-set)))
+    (poll-set-add! poll-set socket *events*)
+    (make-http-server socket 1 poll-set)))
 
-;; -> (keep-alive client request body | keep-alive #f #f #f)
-(define (http-read server keep-alive)
-  (call-with-values (lambda ()
-                      (let ((ports (cons server keep-alive)))
-                        (apply values (select ports '() ports))))
-    (lambda (readable writable except)
+;; -> (client request body | #f #f #f)
+(define (http-read server)
+  (let* ((poll-set (http-poll-set server)))
+    (let lp ((idx (http-poll-idx server)))
       (cond
-       ((pair? except)
-        (values (fold (lambda (p keep-alive)
-                        (close-port p)
-                        (if (eq? p server)
-                            (throw 'interrupt)
-                            (delq p keep-alive)))
-                      keep-alive
-                      except)
-                #f #f #f))
-       ((memq server readable)
-        ;; FIXME: meta to read-request
-        (let* ((client (let ((pair (accept server)))
-                         ;; line buffered for request
-                         (setvbuf (car pair) _IOLBF)
-                         pair))
-               (req (read-request (car client)))
-               (body-str (begin
-                           ;; block buffered for body and response
-                           (setvbuf (car client) _IOFBF)
-                           (read-request-body/latin-1 req))))
-          (values keep-alive (car client) req body-str)))
-       ((pair? readable)
-        ;; FIXME: preserve meta for keep-alive
-        (let* ((p (car readable))
-               (keep-alive (delq p keep-alive)))
-          (if (eof-object? (peek-char p))
-              (begin
-                (close-port p)
-                (values keep-alive #f #f #f))
-              (call-with-error-handling
-               (lambda ()
-                 ;; http-write already left p in line-buffered state
-                 (let* ((req (read-request p))
-                        (body-str (begin
-                                    ;; block buffered for body and response
-                                    (setvbuf p _IOFBF)
-                                    (read-request-body/latin-1 req))))
-                   (values keep-alive p req body-str)))
-               #:pass-keys '(quit interrupt)
-               #:on-error (if (batch-mode?) 'pass 'debug)
-               #:post-error
-               (lambda (k . args)
-                 (warn "Error while reading request" k args)
-                 (values keep-alive #f #f #f #f))))))
+       ((not (< idx (poll-set-nfds poll-set)))
+        (poll poll-set)
+        (lp 0))
        (else
-        (values keep-alive #f #f #f))))))
+        (let ((revents (poll-set-revents poll-set idx)))
+          (cond
+           ((zero? revents)
+            ;; Nothing on this port.
+            (lp (1+ idx)))
+           ((zero? idx)
+            ;; The server socket.
+            (if (not (zero? (logand revents *error-events*)))
+                ;; An error.
+                (throw 'interrupt)
+                ;; Otherwise, we have a new client. Add to set, then
+                ;; find another client that is ready to read.
+                ;;
+                ;; FIXME: preserve meta-info.
+                (let ((client (accept (poll-set-port poll-set idx))))
+                  ;; Set line buffering while reading the request.
+                  (setvbuf (car client) _IOLBF)
+                  (poll-set-add! poll-set (car client) *events*)
+                  (lp (1+ idx)))))
+           ;; Otherwise, a client socket with some activity on
+           ;; it. Remove it from the poll set.
+           (else
+            (let ((port (poll-set-remove! poll-set idx)))
+              (cond
+               ((or (not (zero? (logand revents *error-events*)))
+                    (eof-object? (peek-char port)))
+                ;; The socket was shut down or had an error. See
+                ;; http://www.greenend.org.uk/rjk/2001/06/poll.html
+                ;; for an interesting discussion.
+                (close-port port)
+                (lp idx))
+               (else
+                ;; Otherwise, try to read a request from this port.
+                ;; Next time we start with this index.
+                (set-http-poll-idx! server idx)
+                (call-with-error-handling
+                 (lambda ()
+                   (let ((req (read-request port)))
+                     ;; Block buffering for reading body and writing response.
+                     (setvbuf port _IOFBF)
+                     (values port
+                             req
+                             (read-request-body/latin-1 req))))
+                 #:pass-keys '(quit interrupt)
+                 #:on-error (if (batch-mode?) 'pass 'debug)
+                 #:post-error
+                 (lambda (k . args)
+                   (warn "Error while reading request" k args)
+                   (values #f #f #f))))))))))))))
 
 (define (keep-alive? response)
   (let ((v (response-version response)))
          ((0) (memq 'keep-alive (response-connection response)))))
       (else #f))))
 
-;; -> (#f | client)
+;; -> 0 values
 (define (http-write server client response body)
-  (let ((response (write-response response client)))
+  (let* ((response (write-response response client))
+         (port (response-port response)))
     (cond
      ((not body))                       ; pass
      ((string? body)
       (write-response-body/bytevector response body))
      (else
       (error "Expected a string or bytevector for body" body)))
-    (force-output (response-port response))
-    (if (keep-alive? response)
-        (let ((p (response-port response)))
-          ;; back to line buffered
-          (setvbuf p _IOLBF)
-          p)
-        (begin
-          (close-port (response-port response))
-          #f))))
+    (cond
+     ((keep-alive? response)
+      (force-output port)
+      ;; back to line buffered
+      (setvbuf port _IOLBF)
+      (poll-set-add! (http-poll-set server) port *events*))
+     (else
+      (close-port port)))
+    (values)))
 
 ;; -> unspecified values
 (define (http-close server)
-  (shutdown server 2)
-  (close-port server))
+  (let ((poll-set (http-poll-set server)))
+    (let lp ((n (poll-set-nfds poll-set)))
+      (if (positive? n)
+          (begin
+            (close-port (poll-set-remove! poll-set (1- n)))
+            (lp (1- n)))))))
 
 (define-server-impl http
   http-open