Search code examples
clojuregroup-byincanter

Clojure : Group-by too slow (13 million-lines file)


Situation

I have a 13 million-lines CSV on which I want to perform logistic regression (incanter) for each group. My file is like that (values are just sample)

ID Max Probability
1  1   0.5 
1  5   0.6
1  10  0.99
2  1   0.1
2  7   0.95

So I first read it with a csv-reader, everithing is fine.

I have then something like that :

( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.

I want to group-by these values by Id, If I remember correctly, there are around 1.2 millions of Ids. (I did it in Python with pandas and it is super fast)

This is my function to read and format the file (it works fine on smaller datasets) :

  (defn read-file
  []
    (let [path (:path-file @config)
          content-csv (take-csv path \,)]
      (->> (group-by :Id content-csv)
           (map (fn [[k v]]
                [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {}))))

I want finally to have something like that to perform the logistic regression (I'm flexible about that, does not need vectors for :x and :y , seqs are ok)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.

Problem

I have trouble with the group-by operation. I tried it separately on the output from CSV and this is taking forever when it does not dies out because of the Java Heap Space memory. I thought that the problem was my mapv thing but this is the group-by.

I thought about using reduce or reduce-kv but I do not know how to use these functions for this kind of purposes.

I do not care about the order of ":x" and ":y" (as soon as they are the same between them, I mean that x and y has the same index ... not a problem because they are on the same line) and of Ids on the final result and I read that group-by keep the order. Maybe that is that which is costly for the operation ?

I give you sample data if any person has encountered that :

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))

Other alternatives

I have other ideas but i'm not sure they are "Clojure"-friendly.

  • In Python, because of the nature of the function and because the file is already ordered, instead of using group-by, I wrote in a dataframe beginning and end indexes for each group so that I just had to select directly the sub-datatab.

  • I can also load a list of the ids instead of computing it from Clojure. Like

    (def ids '("1" "2" etc.

So maybe it is possible to begin with :

{"1" {:x [] :y []} "2" {:x [] :y []} etc.

from the previous seq and then match the big file on each ID.

I don't know if it's more efficient in fact.

I have all the other functions for logistic regression, I just lack this part ! Thanks !

EDIT

Thanks for the answers, I finally have this solution.

In my project.clj file

 :jvm-opts ["-Xmx13g"])

Code :

(defn data-group->map [group]
  {(:Id (first group))
   {:x (map :Max group)
    :y (map :Probability group)}})


(defn prob-cumsum [data]
  (cag/fmap
    (fn [x]
      (assoc x :y (reductions + (x :y))))
  data))


(defn process-data-splitter [data]
  (->> (partition-by :Id data)
       (map data-group->map)
       (into {})
       (prob-cumsum)))

I wrapped all my code and it works. The split takes about 5 minutes but I do not need mega-speed. Memory usage can go up to all memory for file reading then less for sigmoid.


Solution

  • if your file is sorted by id, you could use partition-by instead of group-by.

    then your code would look like this:

    (defn data-group->map [group]
      [(:Id (first group))
       {:x (mapv :Max group)
        :y (mapv :Probability group)}])
    
    (defn read-file []
      (let [path (:path-file @config)
            content-csv (take-csv path \,)]
        (->> content-csv
             (partition-by :Id)
             (map data-group->map)
             (into {}))))
    

    that should speed it up. Then you can probably make it faster using transducers

    (defn read-file []
      (let [path (:path-file @config)
            content-csv (take-csv path \,)]
        (into {} (comp (partition-by :Id)
                       (map data-group->map))
              content-csv)))
    

    let's make some tests:

    first generate a huge data like yours:

    (def huge-data
      (doall (mapcat #(repeat 
                         1000000
                         {:Id % :Max 1 :Probability 10})
               (range 10))))
    

    we have ten million items dataset, with million of {:Id 0 :Max 1 :Probability 10}, million of {:Id 1 :Max 1 :Probability 10} and so on.

    now functions to be tested:

    (defn process-data-group-by [data]
      (->> (group-by :Id data)
           (map (fn [[k v]]
                  [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {})))
    
    (defn process-data-partition-by [data]
      (->> data
           (partition-by :Id)
           (map data-group->map)
           (into {})))
    
    (defn process-data-transducer [data]
      (into {} (comp (partition-by :Id) (map data-group->map)) data))
    

    and now time tests:

    (do (time (dorun (process-data-group-by huge-data)))
        (time (dorun (process-data-partition-by huge-data)))
        (time (dorun (process-data-transducer huge-data))))
    
    "Elapsed time: 3377.167645 msecs"
    "Elapsed time: 3707.03448 msecs"
    "Elapsed time: 1462.955152 msecs"
    

    Notice, that partition-by produces lazy sequence, while group-by should realize whole collection. So if you need your data group by group, not the whole map, you can remove (into {}) and access each one faster:

    (defn process-data-partition-by [data]
      (->> data
           (partition-by :Id)
           (map data-group->map)))
    

    check:

    user> (time (def processed-data (process-data-partition-by huge-data)))
    "Elapsed time: 0.06079 msecs"
    #'user/processed-data
    user> (time (let [f (first processed-data)]))
    "Elapsed time: 302.200571 msecs"
    nil
    user> (time (let [f (second processed-data)]))
    "Elapsed time: 500.597153 msecs"
    nil
    user> (time (let [f (last processed-data)]))
    "Elapsed time: 2924.588625 msecs"
    nil
    user.core> (time (let [f (last processed-data)]))
    "Elapsed time: 0.037646 msecs"
    nil