Search code examples
scalaintellij-ideaapache-sparksdkjdk1.7

How to split sentences into words inside map(case(key,value)=>...) in scala spark


val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text = sc.textFile("/home/tobbyj/HW1_INF553/shortTwitter.txt")
val twitter = text
  .map(_.toLowerCase)
  .map(_.replace("\t", ""))
  .map(_.replace("\"", ""))
  .map(_.replace("\n", ""))
  .map(_.replace(".", ""))
  .map(_.replaceAll("[\\p{C}]", ""))
  .map(_.split("text:")(1).split(",source:")(0))
  .zipWithIndex.map(_.swap)

Using above code I have the results as below.

(0,a rose by any other name would smell as sweet)
(1,a rose is a rose is a rose)
(4,rt @nba2k: the battle of two young teams tough season but one will emerge victorious who will it be? lakers or 76ers? https:\/\/tco\/nukkjq\u2026)
(2,love is like a rose the joy of all the earth)
(5,i was going to bake a cake and listen to the football flour refund?)
(3,at christmas i no more desire a rose than wish a snow in may’s new-fangled mirth)

However, the result I want is 'key' starting from 1 and 'value' separated into words like below for your understanding, even though I'm not sure it's going to look like below.

(1,(a, rose, by, any, other, name, would, smell, as, sweet))
(2,(a, rose, is, a, rose, is, a, rose))
...

The code I tired is

.map{case(key, value)=>(key+1, value.split(" "))}

but give me the results as below

(1,[Ljava.lang.String;@1dff58b)
(2,[Ljava.lang.String;@167179a3)
(3,[Ljava.lang.String;@73e8c7d7)
(4,[Ljava.lang.String;@7bffa418)
(5,[Ljava.lang.String;@2d385beb)
(6,[Ljava.lang.String;@4f1ab87e)

Any suggestions? After this step, I am going to map them like (1, a), (1, rose), (1, by)...(2, love), (2, rose), ....


Solution

  • You can import org.apache.spark.rdd.PairRDDFunctions (documented here) to work more easily with key-value pairs.

    At that point, you can use the flatMapValues method to obtain what you want; here is a minimal working example (just copy from the line containing val tweets if you are in the Spark console):

    import org.apache.spark._
    import org.apache.spark.rdd.PairRDDFunctions
    
    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc   = new SparkContext(conf)
    
    val tweets = sc.parallelize(Seq(
      "this is my first tweet", 
      "and this is my second", 
      "ok this is getting boring"))
    
    val results =
      tweets.
        zipWithIndex.
        map(_.swap).
        flatMapValues(_.split(" "))
    
    results.collect.foreach(println)
    

    This is the output of this few lines of code:

    (0,this)
    (0,is)
    (0,my)
    (0,first)
    (0,tweet)
    (1,and)
    (1,this)
    (1,is)
    (1,my)
    (1,second)
    (2,ok)
    (2,this)
    (2,is)
    (2,getting)
    (2,boring)
    

    If you are interested in seeing a small example showing how to analyze a live Twitter feed with Spark Streaming you can find one here.