Search code examples
clojurecore.async

Core.async: Take all values from collection of promise-chans


Consider a dataset like this:

(def data [{:url "http://www.url1.com" :type :a}
           {:url "http://www.url2.com" :type :a}
           {:url "http://www.url3.com" :type :a}
           {:url "http://www.url4.com" :type :b}])

The contents of those URL's should be requested in parallel. Depending on the item's :type value those contents should be parsed by corresponding functions. The parsing functions return collections, which should be concatenated, once all the responses have arrived.

So let's assume that there are functions parse-a and parse-b, which both return a collection of strings when they are passed a string containing HTML content.

It looks like core.async could be a good tool for this. One could either have separate channels for each item ore one single channel. I'm not sure which way would be preferable here. With several channels one could use transducers for the postprocessing/parsing. There is also a special promise-chan which might be proper here.

Here is a code-sketch, I'm using a callback based HTTP kit function. Unfortunately, I could not find a generic solution inside the go block.

(defn f [data] 
  (let [chans (map (fn [{:keys [url type]}] 
                     (let [c (promise-chan (map ({:a parse-a :b parse-b} type)))] 
                       (http/get url {} #(put! c %))
                       c))
                   data)
        result-c (promise-chan)] 
    (go (put! result-c (concat (<! (nth chans 0))
                               (<! (nth chans 1))
                               (<! (nth chans 2))
                               (<! (nth chans 3)))))
    result-c))

The result can be read like so:

(go (prn (<! (f data))))

Solution

  • I'd say that promise-chan does more harm than good here. The problem is that most of core.async API (a/merge, a/reduce etc.) relies on fact that channels will close at some point, promise-chans in turn never close.

    So, if sticking with core.async is crucial for you, the better solution will be not to use promise-chan, but ordinary channel instead, which will be closed after first put!:

    ...
    (let [c (chan 1 (map ({:a parse-a :b parse-b} type)))]
      (http/get url {} #(do (put! c %) (close! c)))
      c)
    ...
    

    At this point, you're working with closed channels and things become a bit simpler. To collect all values you could do something like this:

    ;; (go (put! result-c (concat (<! (nth chans 0))
    ;;                            (<! (nth chans 1))
    ;;                            (<! (nth chans 2))
    ;;                            (<! (nth chans 3)))))
    ;; instead of above, now you can do this:
    (->> chans
         async/merge
         (async/reduce into []))
    

    UPD (below are my personal opinions):

    Seems, that using core.async channels as promises (either in form of promise-chan or channel that closes after single put!) is not the best approach. When things grow, it turns out that core.async API overall is (you may have noticed that) not that pleasant as it could be. Also there are several unsupported constructs, that may force you to write less idiomatic code than it could be. In addition, there is no built-in error handling (if error occurs within go-block, go-block will silently return nil) and to address this you'll need to come up with something of your own (reinvent the wheel). Therefore, if you need promises, I'd recommend to use specific library for that, for example manifold or promesa.