Search code examples
asynchronousclojurecore.async

Clojure - Core.async Pipeline + take confusion


I'm having difficulty understanding what I thought was a pretty simple concept in Clojure's async library. I'm essentially creating two channels with a pipeline, where the output channel is created using the take function of the input channel.

From my understanding, the purpose of take is to limit the number of items that a channel will receive before it closes itself (if the input channel has not already closed by this time). However, the code examples I've been playing with aren't producing the results I expected.

Take the following code for example:

(def in (chan 1))
(def out (async/take 5 in 1))

(doseq [i (range 10)]
  (go (>! in i)))

(pipeline 4 out (filter even?) in)

(go-loop []
  (when-some [val (<! out)]
    (println val)
    (recur))))

What I expected to happen was that the pipeline would filter out odd numbers, and only pass even numbers to the 'out' channel, when the out channel had received 5 even numbers it would close. However what I saw was both odd and even numbers printed to the REPL, something like the following:

2 7 4 0 8 6

At this point the out channel still hadn't closed and running the doseq a second time would print some other value before finally closing.

I'm incredibly perplexed as to what's going on here, it works like a charm when using take and not the pipeline and it also works when not using take but still using the pipeline, using the two in combination is a whole different story it seems. Am I missing something obvious here? Apologies if this is a simple mistake, this is my first (albeit naive) attempt at using core.async.


Solution

  • You have placed take and pipeline in competition. Both of them are taking items from in and adding them to out. Replace the definition of out:

    (def out (async/chan 3))
    

    for example, and get the expected result

    0
    2
    4
    6
    8
    

    If you really want to use async/take, you could do it like so:

    (def first (async/chan 1))
    (def second (async/chan 3))
    (pipeline 4 second (filter even?) first)
    (def third (async/take 3 second))
    
    (defn run []
      (go
        (doseq [i (range 10)]
          (>! first i)))
      (go (loop []
            (when-some [val (<! third)]
              (println val)
              (recur)))))
    

    with result:

    0
    2
    4