Search code examples
clojureapache-sparkrddflambo

Implementing a flambo mapValues function in clojure


I have a clojure function that uses the flambo v0.60 functions api to do some analysis on a sample data set. I noticed that when I use a (get rdd 2) instead of getting the second element in the rdd collection, its getting the second character of the first element of the rdd collection. My assumption is clojure is treating each row of the rdd collection as a whole string and not a vector for me to be able to get the second element in the collection. I'm thinking of using the map-values function to convert the mapped values into a vector for which I can get the second element, I tried this:

(defn split-on-tab-transformation [xctx input]
 (assoc xctx :rdd (-> (:rdd xctx)
                   (spark/map (spark/fn [row] (s/split row #"\t")))
                   (spark/map-values vec)))) 

Unfortunately I got an error: java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...

This is code returns the first collection in the rdd: (assuming I removed the (spark/map-values vec) in the above function

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
(clojure.pprint/pprint rdds)))

Output:

[2.00000 770127      200939.000000   \t6094\tBENTONVILLE, AR DPS\t22.500000\t5.000000\t2.500000\t5.000000\t0.000000\t0.000000\t0.000000\t0.000000\t0.000000\t1\tStore Tab\t0.000000\t4.50\t3.83\t5.00\t0.000000\t0.000000\t0.000000\t0.000000\t19.150000]

if I try to get the second element 770127

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
   (clojure.pprint/pprint (get rdds 1)))

I get :

[\.]

Flambo documentation for map-values

I'm new to clojure and I'd appreciate any help. Thanks


Solution

  • First of all map-values (or mapValues in Spark API) is a valid transformation only on a PairRDD (for example something like this [:foo [1 2 3]]. RDDs with values like this can be interpreted as some some sort of maps where the first element is a key and the second is a value.

    If you have RDD like this mapValues transforms the values without changing the key. In this case you should use a second map, although it seem obsolete since clojure.string/split already returns a vector.

    A simple example of using map-values:

    (let [pairs [(ft/tuple :foo 1) (ft/tuple :bar 2)]
          rdd (f/parallelize-pairs sc pairs) ;; Note parallelize-pairs -> PairRDD
          result (-> rdd       
              (f/map-values inc) ;; Map values
              (f/collect))]
      (assert (= result [(ft/tuple :foo 2) (ft/tuple :bar 3)])))
    

    From your description it looks like you're using an input RDD instead of the one returned from split-on-tab-transformation. If I had to guess you're trying to use original xctx, not the one returned from split-on-tab-transformation. Since Clojure maps are immutable assoc doesn't change a passed argument and get-distinct-column-val receives RDD[String] not RDD[Array[String]]

    Based on a naming convention I assume you want to get distinct values for a single position in a array. I removed unused parts of your code for clarity. First lets create dummy data:

    (spit "data.txt"
          (str "Mazda RX4\t21\t6\t160\n"
               "Mazda RX4 Wag\t21\t6\t160\n"
               "Datsun 710\t22.8\t4\t108\n"))
    

    add rewritten versions of your functions

    (defn split-on-tab-transformation [xctx]
       (assoc xctx :rdd (-> (:rdd xctx)
                            (f/map #(clojure.string/split % #"\t")))))
    
    (defn get-distinct-column-val
      [xctx col]
        (-> (:rdd xctx)
          (f/map #(get % col))
            (f/distinct)))
    

    and result

    (assert
     (= #{"Mazda RX4 Wag" "Datsun 710" "Mazda RX4"}
        (-> {:sc sc :rdd (f/text-file sc "data.txt")}
          (split-on-tab-transformation)
          (get-distinct-column-val 0)
          (f/collect)
          (set))))