Search code examples
clojure

Why does xml/emit stream promptly, while writing to a stream directly does not


I'm trying to write text to a stream in small chunks, which works properly when I point an XML stream writer at the output stream (it begins sending data immediatly) and then I try to write some text and then flush it sends nothing until the stream is closed.

(defn data
  "Download a 5MB file and parse it"
  []
  (-> "http://www.cs.washington.edu/research/xmldatasets/data/tpc-h/orders.xml"
      URL.
      .openStream
      xml/parse))

(defn send-stuff [request]
  (condp = (:uri request)
    "/text" (response/response
             (ring-io/piped-input-stream
              #(let [w (io/make-writer % {:encoding "UTF-8"})]
                 (.write w "start\n")
                 (.flush w)
                 (Thread/sleep 1000)
                 (.write w "done\n")
                 (.flush w))))
    "/xml"  (response/response
             (ring-io/piped-input-stream
              #(->> (io/make-writer % {:encoding "UTF-8"})
                    (xml/emit (data))
                    .flush)))))

(comment
  (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
  (.stop server))

testing this with curl like so:

curl localhost:8888/text

sits there silently for exactly one second, then returns

start
done

I expected to see "start" then one second later "done", rather than a one second delay, then both of them.

and using

curl localhost:8888/xml

starts streaming eye-gouging XML immediatly (sorry a personal bias snuck in there ;-)

-- EDIT I have confirmed the problem is with the jetty output buffer because if I set that buffer really small it goes away:

(def server (jetty/run-jetty #'send-stuff {:output-buffer-size 1 :port 8888 :join? false}))

of course setting the output buffer to one is a bad idea in many cases.


Solution

  • The .flush you are calling is not on the stream that is used for HTTP response but rather an output stream of the piped streams pair.

    When you look at the source code of PipedOutputStream.flush() you will notice that it only notify all threads waiting to read from the connected PipedInputStream and it doesn't mean flushing to the underlying HTTP response stream.

    The difference in behaviour is caused by the response data size. If you change your example to use a small XML data the behaviour will be the same:

    (defn data
      []
      (-> "<?xml version=\"1.0\" encoding=\"UTF-8\"?><a>1</a>"
          (.getBytes)
          (ByteArrayInputStream.)
          (xml/parse)))
    
    
    (defn send-stuff [request]
      (condp = (:uri request)
        "/text" (response/response
                  (ring-io/piped-input-stream
                    #(let [w (io/make-writer % {:encoding "UTF-8"})]
                       (.write w "start\n")
                       (.flush w)
                       (Thread/sleep 1000)
                       (.write w "done\n")
                       (.flush w))))
        "/xml"  (response/response
                  (ring-io/piped-input-stream
                    #(let [w (io/make-writer % {:encoding "UTF-8"})]
                       (xml/emit (data) w)
                       (.flush w)
                       (Thread/sleep 1000)
                       (xml/emit (data) w)
                       (.flush w))))))
    

    Calling curl localhost:8888/xml will display only the whole response after one second:

    <?xml version="1.0" encoding="UTF-8"?><a>1</a><?xml version="1.0" encoding="UTF-8"?><a>1</a>

    You can use a different streaming mechanism where you would have control over flushing the HTTP response stream for example with blocking queue:

    (ns so43769408
      (:require [ring.adapter.jetty :as jetty]
                [clojure.java.io :as io]
                [ring.util.response :as response]
                [ring.core.protocols :as protocols])
      (:import (java.io OutputStream)
               (java.util.concurrent LinkedBlockingQueue)))
    
    (extend-protocol protocols/StreamableResponseBody
      LinkedBlockingQueue
      (write-body-to-stream [output-queue _ ^OutputStream output-stream]
        (with-open [out (io/writer output-stream)]
          (loop [chunk (.take output-queue)]
            (when-not (= chunk ::EOF)
              (.write out (str chunk))
              (.flush out)
              (recur (.take output-queue)))))))
    
    
    (defn send-stuff [request]
      (response/response
        (let [output-queue (LinkedBlockingQueue.)]
          (future
            (.put output-queue "start\n")
            (Thread/sleep 1000)
            (.put output-queue "end\n")
            (.put output-queue ::EOF))
          output-queue)))
    
    (comment
      (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
      (.stop server))