Search code examples
concurrencyclojurecore.asynchttp-kit

easiest way to use a i/o callback within concurrent http-kit/get instances


I am launching a few hundreds concurrent http-kit.client/get requests provided with a callback to write results to a single file.

What would be a good way to deal with thread-safety? Using chanand <!! from core.asyc?

Here's the code I would consider :

(defn launch-async [channel url]                                                                                                                                
  (http/get url {:timeout 5000                                                                                                                                  
                 :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}                                              
          (fn [{:keys [status headers body error]}]                                                                                                             
            (if error                                                                                                                                           
              (put! channel (json/generate-string {:url url :headers headers :status status}))                                                                  
              (put! channel (json/generate-string body))))))                                                                                                    

(defn process-async [channel func]                                                                                                                              
  (when-let [response (<!! channel)]                                                                                                                            
    (func response)))                                                                                                                                           

(defn http-gets-async [func urls]                                                                                                                               
  (let [channel (chan)]                                                                                                                                         
    (doall (map #(launch-async channel %) urls))                                                                                                                
    (process-async channel func)))    

Thanks for your insights.


Solution

  • Since you are already using core.async in your example, I thought I'd point out a few issues and how you can address them. The other answer mentions using a more basic approach, and I agree wholeheartedly that a simpler approach is just fine. However, with channels, you have a simple way of consuming the data which does not involve mapping over a vector, which will also grow large over time if you have many responses. Consider the following issues and how we can fix them:

    (1) Your current version will crash if your url list has more than 1024 elements. There's an internal buffer for puts and takes that are asynchronous (i.e., put! and take! don't block but always return immediately), and the limit is 1024. This is in place to prevent unbounded asynchronous usage of the channel. To see for yourself, call (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com")).

    What you want to do is to only put something on the channel when there's room to do so. This is called back-pressure. Taking a page from the excellent wiki on go block best practices, one clever way to do this from your http-kit callback is to use the put! callback option to launch your next http get; this will only happen when the put! immediately succeeds, so you will never have a situation where you can go beyond the channel's buffer:

    (defn launch-async
      [channel [url & urls]]
      (when url
        (http/get url {:timeout 5000
                       :user-agent "Mozilla"}
                  (fn [{:keys [status headers body error]}]
                    (let [put-on-chan (if error
                                        (json/generate-string {:url url :headers headers :status status})
                                        (json/generate-string body))]
                      (put! channel put-on-chan (fn [_] (launch-async channel urls))))))))
    

    (2) Next, you seem to be only processing one response. Instead, use a go-loop:

    (defn process-async
      [channel func]
      (go-loop []
        (when-let [response (<! channel)]
          (func response)
          (recur))))
    

    (3) Here's your http-gets-async function. I see no harm in adding a buffer here, as it should help you fire off a nice burst of requests at the beginning:

    (defn http-gets-async
      [func urls]
      (let [channel (chan 1000)]
        (launch-async channel urls)
        (process-async channel func)))
    

    Now, you have the ability to process an infinite number of urls, with back-pressure. To test this, define a counter, and then make your processing function increment this counter to see your progress. Using a localhost URL that is easy to bang on (wouldn't recommend firing off hundreds of thousands of requests to, say, google, etc.):

    (def responses (atom 0))
    (http-gets-async (fn [_] (swap! responses inc))
                     (repeat 1000000 "http://localhost:8000"))
    

    As this is all asynchronous, your function will return immediately and you can look at @responses grow.

    One other interesting thing you can do is instead of running your processing function in process-async, you could optionally apply it as a transducer on the channel itself.

    (defn process-async
      [channel]
      (go-loop []
        (when-let [_ (<! channel)]
          (recur))))
    
    (defn http-gets-async
      [func urls]
      (let [channel (chan 10000 (map func))] ;; <-- transducer on channel
        (launch-async channel urls)
        (process-async channel)))
    

    There are many ways to do this, including constructing it so that the channel closes (note that above, it stays open). You have java.util.concurrent primitives to help in this regard if you like, and they are quite easy to use. The possibilities are very numerous.