I'd like to run a code like
(->> input
(partition-all 5)
(map a-side-effect)
dorun)
asynchronously dividing input and output(a-side-effect).
Then I've written the code to experiment below.
;; using boot-clj
(set-env! :dependencies '[[org.clojure/core.async "0.2.374"]])
(require '[clojure.core.async :as async :refer [<! <!! >! >!!]])
(let [input (range 18)
c (async/chan 1 (comp (partition-all 5)
(map prn)))]
(async/onto-chan c input false)
(async/close! c))
explanation for this code:
async/onto-chan
is used to put a Seq of elements (a fragment of input) into the channel c
and will be called many times thus the 3rd argument is false
.prn
is a substitute for a-side-effect
.I expected the code above prints
[0 1 2 3 4]
[5 6 7 8 9]
[10 11 12 13 14]
[15 16 17]
in REPL however it prints no characters.
And then I add a time to wait, like this
(let [c (async/chan 1 (comp (partition-all 5)
(map prn)))]
(async/onto-chan c (range 18) false)
(Thread/sleep 1000) ;wait
(async/close! c))
This code gave my expected output above.
And then I inspect core.async/onto-chan
.
And I think what happend:
c
was core.async/close!
ed in my code.core.async/onto-chan
was put(core.async/>!
) in vain in the go-loop
in onto-chan
because the channel c
was closed.Are there sure ways to put items before close!
ing?
write a synchronous version of onto-chan
not using go-loop
?
Or is my idea wrong?
Your second example with Thread.sleep
only ‘works’ by mistake.
The reason it works is that every transformed result value that comes out of c
’s transducer is nil
, and since nil
s are not allowed in channels, an exception is thrown, and no value is put into c
: this is what allows the producer onto-chan
to continue putting into the channel and not block waiting. If you paste your second example into the REPL you’ll see four stack traces – one for each partition.
The nil
s are of course due to mapping over prn
, which is a side-effecting function that returns nil
for all inputs.
If I understand your design correctly, your goal is to do something like this:
(defn go-run! [ch proc]
(async/go-loop []
(when-let [value (<! ch)]
(proc value)
(recur))))
(let [input (range 18)
c (async/chan 1 (partition-all 5))]
(async/onto-chan c input)
(<!! (go-run! c prn)))
go-loop
consumer.map
and side-effects don’t go together well, so I’ve extracted the side-effecting prn
into the consumer.onto-chan
cannot be called ‘many times’ (at least in the code shown) so it doesn’t need the false
argument.