Search code examples
clojurecore.asyncclj-http

Proper way to ensure clj-http's connection manager is closed after all requests are done


I have a code that is a combination of clj-http, core.async facilities and an atom. It creates some threads to fetch and parse a bunch of pages:

(defn fetch-page
  ([url] (fetch-page url nil))
  ([url conn-manager]
    (-> (http.client/get url {:connection-manager conn-manager})
        :body hickory/parse hickory/as-hickory)))

(defn- create-worker
  [url-chan result conn-manager]
  (async/thread
    (loop [url (async/<!! url-chan)]
      (when url
        (swap! result assoc url (fetch-page url conn-manager))
        (recur (async/<!! url-chan))))))

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))]
    ; wait for workers to finish and shut conn-manager down
    (dotimes [_ n-cpus] (async/alts!! workers))
    (http.conn-mgr/shutdown-manager conn-manager)

    (mapv #(get @pages %) urls)))

The idea is to use multiple threads to reduce the time to fetch and parse the pages, but I'd like to not overload the server, sending a lot of requests at once - that is why a connection manager was used. I don't know if my approach is correct, suggestions are welcome. Currently the problem is that the last requests fail because the connection manager is shutdown before they terminate: Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down.

The main questions: how do I close the connection manager at the right moment (and why my current code fails in doing it)? The side quest: is my approach right? If not, what could I do to fetch and parse multiple pages at once, while not overloading the server?

Thanks!


Solution

  • The problem is that async/alts!! returns on the first result (and will keep doing so since workers never changes). I think using async/merge to build a channel and then repeatedly read off of it should work.

    (defn fetch-pages
      [urls]
      (let [url-chan (async/to-chan urls)
            pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
            conn-manager (http.conn-mgr/make-reusable-conn-manager {})
            workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                          (range n-cpus))
            all-workers (async/merge workers)]
        ; wait for workers to finish and shut conn-manager down
        (dotimes [_ n-cpus] (async/<!! all-workers))
        (http.conn-mgr/shutdown-manager conn-manager)
    
        (mapv #(get @pages %) urls)))
    

    Alternatively, you could recur and keep shrinking workers instead so that you're only waiting on previously unfinished workers.

    (defn fetch-pages
      [urls]
      (let [url-chan (async/to-chan urls)
            pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
            conn-manager (http.conn-mgr/make-reusable-conn-manager {})
            workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                          (range n-cpus))]
        ; wait for workers to finish and shut conn-manager down
        (loop [workers workers]
          (when (seq workers)
            (let [[_ finished-worker] (async/alts!! workers)]
              (recur (filterv #(not= finished-worker %) workers)))))
    
        (http.conn-mgr/shutdown-manager conn-manager)    
        (mapv #(get @pages %) urls)))