Search code examples
clojureconditional-statementstransducer

Implementing Clojure conditional/branching transducer


I'm trying to make a conditional transducer in Clojure as follows:

(defn if-xf
  "Takes a predicate and two transducers.
   Returns a new transducer that routes the input to one of the transducers
   depending on the result of the predicate."
  [pred a b]
  (fn [rf]
    (let [arf (a rf)
          brf (b rf)]
      (fn
        ([] (rf))
        ([result]
           (rf result))
        ([result input]
           (if (pred input)
             (arf result input)
             (brf result input)))))))

It is pretty useful in that it lets you do stuff like this:

;; multiply odd numbers by 100, square the evens.  
(= [0 100 4 300 16 500 36 700 64 900]
    (sequence
          (if-xf odd? (map #(* % 100)) (map (fn [x] (* x x))))
          (range 10)))

However, this conditional transducer does not work very well with transducers that perform cleanup in their 1-arity branch:

;; negs are multiplied by 100, non-negs are partitioned by 2
;; BUT! where did 6 go?
;; expected: [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5] [6]]
;;
(= [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5]]
 (sequence
  (if-xf neg? (map #(* % 100)) (partition-all 2))
  (range -6 7)))

Is it possible to tweak the definition of if-xf to handle the case of transducers with cleanup?

I'm trying this, but with weird behavior:

(defn if-xf
  "Takes a predicate and two transducers.
   Returns a new transducer that routes the input to one of the transducers
   depending on the result of the predicate."
  [pred a b]
  (fn [rf]
    (let [arf (a rf)
          brf (b rf)]
      (fn
        ([] (rf))
        ([result]
           (arf result) ;; new!
           (brf result) ;; new!
           (rf result))
        ([result input]
           (if (pred input)
             (arf result input)
             (brf result input)))))))

Specifically, the flushing happens at the end:

;; the [0] at the end should appear just before the 100.
(= [[-6 -5] [-4 -3] [-2 -1] 100 200 300 400 500 600 [0]]
      (sequence
       (if-xf pos? (map #(* % 100)) (partition-all 2))
       (range -6 7)))

Is there a way to make this branching/conditional transducer without storing the entire input sequence in local state within this transducer (i.e. doing all the processing in the 1-arity branch upon cleanup)?


Solution

  • The idea is to complete every time the transducer switches over. IMO this is the only way to do it without buffering:

    (defn if-xf
      "Takes a predicate and two transducers.
       Returns a new transducer that routes the input to one of the transducers
       depending on the result of the predicate."
      [pred a b]
      (fn [rf]
        (let [arf (volatile! (a rf))
              brf (volatile! (b rf))
              a? (volatile! nil)]
          (fn
            ([] (rf))
            ([result]
             (let [crf (if @a? @arf @brf)]
               (-> result crf rf)))
            ([result input]
             (let [p? (pred input)
                   [xrf crf] (if p? [@arf @brf] [@brf @arf])
                   switched? (some-> @a? (not= p?))]
               (if switched?
                 (-> result crf (xrf input))
                 (xrf result input))
               (vreset! a? p?)))))))
    (sequence (if-xf pos? (map #(* % 100)) (partition-all 2)) [0 1 0 1 0 0 0 1])
    ; => ([0] 100 [0] 100 [0 0] [0] 100)