Search code examples
clojurecore.asynctransducer

How to create a channel from another with transducers?


I want to create a channel of clojure.core.async from another one that just filters specific messages. Therefore I found a function called filter<.

=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

But the function and its friends are marked as deprecated:

Deprecated - this function will be removed. Use transducer instead

There are some ways to use channels with transducer like chan with the xform parameter. How can I build a new channel from an existing one using transducers?


Solution

  • I did some research on this, found a couple of interesting articles (first and second), and then got something working using pipeline

    (require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
    (def c1 (chan))
    (def c2 (chan))
    
    (pipeline 4 c2 (filter even?) c1)
    
    (put! c1 1)
    (put! c1 2)
    (<!! c2)
    ;;=> 2
    

    The second article I linked makes this a bit cleaner with some helper functions around the pipeline function:

    (defn ncpus []
      (.availableProcessors (Runtime/getRuntime)))
    
    (defn parallelism []
      (+ (ncpus) 1))
    
    (defn add-transducer
      [in xf]
      (let [out (chan (buffer 16))]
        (pipeline (parallelism) out xf in)
        out))
    

    Then you can simply tie channels together with

    (def c1 (chan))
    (def c2 (add-transducer c1 (filter even?))
    

    To complete the answer, as you found yourself you can use pipe in a similar fashion:

    (defn pipe-trans
      [ci xf]
      (let [co (chan 1 xf)]
        (pipe ci co)
        co))
    (def c1 (chan))
    (def c2 (pipe-trans c1 (filter even?)))