I have a code that is a combination of clj-http
, core.async
facilities and an atom
. It creates some threads to fetch and parse a bunch of pages:
(defn fetch-page
([url] (fetch-page url nil))
([url conn-manager]
(-> (http.client/get url {:connection-manager conn-manager})
:body hickory/parse hickory/as-hickory)))
(defn- create-worker
[url-chan result conn-manager]
(loop [url (async/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (async/<!! url-chan))))))
(defn fetch-pages
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/alts!! workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
The idea is to use multiple threads to reduce the time to fetch and parse the pages, but I'd like to not overload the server, sending a lot of requests at once - that is why a connection manager was used. I don't know if my approach is correct, suggestions are welcome. Currently the problem is that the last requests fail because the connection manager is shutdown before they terminate: Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down
The main questions: how do I close the connection manager at the right moment (and why my current code fails in doing it)? The side quest: is my approach right? If not, what could I do to fetch and parse multiple pages at once, while not overloading the server?
The problem is that async/alts!!
returns on the first result (and will keep doing so since workers
never changes). I think using async/merge
to build a channel and then repeatedly read off of it should work.
(defn fetch-pages
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))
all-workers (async/merge workers)]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/<!! all-workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
Alternatively, you could recur and keep shrinking workers
instead so that you're only waiting on previously unfinished workers.
(defn fetch-pages
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(loop [workers workers]
(when (seq workers)
(let [[_ finished-worker] (async/alts!! workers)]
(recur (filterv #(not= finished-worker %) workers)))))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))