Search code examples
asynchronousclojurecore.asynctransducer

How to prevent close!-ing before put-ing in onto-chan


I'd like to run a code like

(->> input
     (partition-all 5)
     (map a-side-effect)
     dorun)

asynchronously dividing input and output(a-side-effect).

Then I've written the code to experiment below.

;; using boot-clj
(set-env! :dependencies '[[org.clojure/core.async "0.2.374"]])
(require '[clojure.core.async :as async :refer [<! <!! >! >!!]])

(let [input (range 18)
      c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c input false)
  (async/close! c))

explanation for this code:

  • Actually elements in input and its quantity is not defined before running and elements in input is able to be taken by some numbers from 0 to 10.
  • async/onto-chan is used to put a Seq of elements (a fragment of input) into the channel c and will be called many times thus the 3rd argument is false.
  • prn is a substitute for a-side-effect.

I expected the code above prints

[0 1 2 3 4]
[5 6 7 8 9]
[10 11 12 13 14]
[15 16 17]

in REPL however it prints no characters.

And then I add a time to wait, like this

(let [c (async/chan 1 (comp (partition-all 5)
                            (map prn)))]
  (async/onto-chan c (range 18) false)
  (Thread/sleep 1000) ;wait
  (async/close! c))

This code gave my expected output above.

And then I inspect core.async/onto-chan.

And I think what happend:

  1. the channel c was core.async/close!ed in my code.
  2. each item of the argument of core.async/onto-chan was put(core.async/>!) in vain in the go-loop in onto-chan because the channel c was closed.

Are there sure ways to put items before close!ing? write a synchronous version of onto-chan not using go-loop?

Or is my idea wrong?


Solution

  • Your second example with Thread.sleep only ‘works’ by mistake.

    The reason it works is that every transformed result value that comes out of c’s transducer is nil, and since nils are not allowed in channels, an exception is thrown, and no value is put into c: this is what allows the producer onto-chan to continue putting into the channel and not block waiting. If you paste your second example into the REPL you’ll see four stack traces – one for each partition.

    The nils are of course due to mapping over prn, which is a side-effecting function that returns nil for all inputs.

    If I understand your design correctly, your goal is to do something like this:

    (defn go-run! [ch proc]
      (async/go-loop []
        (when-let [value (<! ch)]
          (proc value)
          (recur))))
    
    (let [input (range 18)
          c (async/chan 1 (partition-all 5))]
      (async/onto-chan c input)
      (<!! (go-run! c prn)))
    
    • You really do need a producer and a consumer, else your program will block. I’ve introduced a go-loop consumer.
    • Very generally speaking, map and side-effects don’t go together well, so I’ve extracted the side-effecting prn into the consumer.
    • onto-chan cannot be called ‘many times’ (at least in the code shown) so it doesn’t need the false argument.