Search code examples
clojurecore.async

core.async with partition-by stateful transducer not keeping state?


(I had a previous question here, and assumed I wouldn't have issues getting to core.async).

Given input data such as this:

(require '[clojure.core.async :as a])

(def input-data
  [{:itm_na 1 :seq_no 1  :doc_img "this is a very long "}
   {:itm_na 1 :seq_no 2  :doc_img "sentence from a mainframe "}
   {:itm_na 1 :seq_no 3  :doc_img "system that was built before i was "}
   {:itm_na 1 :seq_no 4  :doc_img "born."}
   {:itm_na 2 :seq_no 1  :doc_img "this is a another very long "}
   {:itm_na 2 :seq_no 2  :doc_img "sentence from the same mainframe "}
   {:itm_na 3 :seq_no 1  :doc_img "Ok here we are again. "}
   {:itm_na 3 :seq_no 2  :doc_img "The mainframe only had 40 char per field so"}
   {:itm_na 3 :seq_no 3  :doc_img "they broke it into multiple rows "}
   {:itm_na 3 :seq_no 4  :doc_img "which seems to be common"}
   {:itm_na 3 :seq_no 5  :doc_img " for the time. "}
   {:itm_na 3 :seq_no 6  :doc_img "thanks for your help."}])

partition-by (as expected) clumps my data into seq's (for later collapsing):

(count (partition-by :itm_na input-data ))
;;=> 3

However, when i try to do this with a core.async pipeline for some reason it doesn't seem to do the same... How do i get the stateful transducer part of partition-by actually preserve state when in an async pipeline?

(let
    [source-chan (a/to-chan input-data)
     target-chan (a/chan 100)
     xf (comp (partition-by :itm_na))
     ]
  (a/pipeline 1
              target-chan
              xf
              source-chan)
  (count (<!! (a/into [] target-chan))))

;;=>12

This should be 3?

Strangely, when I bind the xf to the channel like below I get my expected result. I'm not sure why a/pipeline behaves differently.

(let [xf (comp (partition-by :itm_na))
      ch (a/chan 1 xf)]
  (a/onto-chan ch input-data)
  (count (<!! (a/into [] ch))))
=>3

From the doc... mentions that stateful bit:

(clojure.repl/doc partition-by) 
-------------------------
clojure.core/partition-by
([f] [f coll])
  Applies f to each value in coll, splitting it each time f returns a
   new value.  Returns a lazy seq of partitions.  Returns a stateful
   transducer when no collection is provided.

Solution

  • This particular case was briefly highlighted by Rich Hickey in his talk: you cannot use pipeline with stateful transducers, basically because of pipeline's parallel nature.