Search code examples
hadoopapache-sparkapache-pig

How to remove the parentheses around records when saveAsTextFile on RDD[(String, Int)]?


How do I remove the parenthesis "(" and ")" from the output by the below spark job?

When I try to read the spark output using PigScript it creates a problem.

My code:

scala> val words = Array("HI","HOW","ARE")
words: Array[String] = Array(HI, HOW, ARE)

scala> val wordsRDD = sc.parallelize(words)
wordsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at    parallelize at <console>:23

scala> val keyvalueRDD = wordsRDD.map(elem => (elem,1))
keyvalueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:25

scala> val wordcountRDD = keyvalueRDD.reduceByKey((x,y) => x+y)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:27

scala> wordcountRDD.saveAsTextFile("/user/cloudera/outputfiles")

Output as per above code :

 hadoop dfs -cat /user/cloudera/outputfiles/part*

(HOW,1)
(ARE,1)
(HI,1)

But I want the output of spark to be stored as below as without parenthesis

HOW,1
ARE,1
HI,1

Now I want to read the above output using a PigScript.

LOAD statement in Pigscript treats "(HOW" as first atom and "1)" as second atom

Is there anyway we can get rid off parenthesis in spark code itself as I don't want to apply the fix for this in pigscript..

Pig script :

records = LOAD '/user/cloudera/outputfiles' USING PigStorage(',') AS (word:chararray);
dump records;

Pig output :

 ((HOW)
 ((ARE)
 ((HI)

Solution

  • Use map transformation before you save the records to outputfiles directory, e.g.

    wordcountRDD.map { case (k, v) => s"$k, $v" }.saveAsTextFile("/user/cloudera/outputfiles")
    

    See Spark's documentation about map.


    I strongly recommend using Datasets instead.

    scala> words.toSeq.toDS.groupBy("value").count().show
    +-----+-----+
    |value|count|
    +-----+-----+
    |  HOW|    1|
    |  ARE|    1|
    |   HI|    1|
    +-----+-----+
    
    scala> words.toSeq.toDS.groupBy("value").count.write.csv("outputfiles")
    
    $ cat outputfiles/part-00199-aa752576-2f65-481b-b4dd-813262abb6c2-c000.csv
    HI,1
    

    See Spark SQL, DataFrames and Datasets Guide.