Search code examples
scalaapache-sparktweets

Apache Spark - Tweets Processing


Given a huge dataset of tweets i need to:

  • extract and count the hashtags.
  • extract and count the emoticons/emojis.
  • extract and count the words (lemmas)

So, the first thing that came to my mind is doing something like that:

val tweets = sparkContext.textFile(DATASET).cache

val hashtags = tweets
                .map(extractHashTags)
                .map((_, 1))
                .reduceByKey(_ + _)

val emoticonsEmojis = tweets
                .map(extractEmoticonsEmojis)
                .map((_, 1))
                .reduceByKey(_ + _)

val lemmas = tweets
                .map(extractLemmas)
                .map((_, 1))
                .reduceByKey(_ + _) 

But in this way each tweet is processed 3 times, is it right? If so, is there an efficient way to count all these elements separately by processing each tweet only once?

I was thinking something like that:

sparkContext.textFile(DATASET)
    .map(extractor) // RDD[(List[String], List[String], List[String])]

But in this way it becomes a nightmare. Also because once i count the words (I refer to the third point of the requests), I would need to make a join with another RDD and this, in the first version, is very simple while in the second version is not.


Solution

  • Using Dataset API:

    val tweets = sparkContext.textFile(DATASET)
    
    val tokens = tweets.flatMap(extractor) //return RDD[(String, String)]
      .toDF("type", "token") //type is one of ("hashtag", "emoticon", "lemma")
      .groupBy("type", "token")
      .count() //Dataset[Row] which has columns ("type", "token", "count")
    
    val lemmas = tokens
      .where($"type" === lit("lemma"))
      .select("token", "count")
      .as[(String, Long)]
      .rdd //should be the same type as your original 'lemmas', for future join