Search code examples
scalardd

Scala RDD matching with similar wording


So I have a list of verbs

Assuming:

verbs.txt

have, have, having, had
give, give, gave, given
take, take, took, taken

Splitted them into rdds

val verbs = sc.textFile("verbs.txt").map(x => x.split("\n").collect()

Therefore,

verbs: Array[Array[String]] = Array(Array(have, have, having, had), Array(give, give, gave, given), Array(take, take, took, taken))

Assuming:

val wordcount = sc.textFile("data.txt")

data.txt

have have have having having had had had had had give give give give give give give give give give gave gave given given given given take take took took took took took took taken taken

I've calculated the wordcount and therefore wordcount =

(have, 3)
(having, 2)
(had, 5)
(give, 10)
(gave, 2)
(given, 4)
(take, 2)
(took, 6)
(taken, 2)

I want to be able to merge the data together with the same verbs Example: (have,3),(having,2),(had,5) => (have, 10)

To return the base form of the verb using the first value of the array. How am I able to do that?


Solution

  • Since you tag your question as RDD, I am assuming your word count data is a RDD.

      // Read text file
      val sc = spark.sparkContext
      val textFile: RDD[String] = sc.textFile("data.txt")
    
      // So you have this as you said
      val verbs = Array(Array("have", "have", "having", "had"), Array("give", "give", "gave", "given"), Array("take", "take", "took", "taken"))
    
      val data= textFile
        .flatMap(_.split(" ")) // Split each line to words/tokens its called tokenization (I used backspace as seperator if you have tabs as seperator use that)
        .map(t => (t, 1)) // Generate count per token (i.e. (have, 1))
        .reduceByKey(_ + _) // Count appearance of each token (i.e. (have, 5)
    
    
      val t = data.map(d => (verbs.find(v => v.contains(d._1)).map(_.head).getOrElse(d._1), d._2)) // Generates RDD of (optional base verb, count for that verb) e.g (having, 5) => (have, 5), unknown verbs left as it is
        .reduceByKey(_ + _) // Sum all values that having same base verb (have, 5), (have, 3) => (have, 8)
    
      t.take(10).foreach(println)
    

    Other Option (Without collecting verbs)

      // You dont have to collect this If you want
      val verbs2 = sc.parallelize(Array(Array("have", "have", "having", "had"), Array("give", "give", "gave", "given"), Array("take", "take", "took", "taken"))) // This is the state before collect
        .flatMap(v => v.map(v2 => (v2, v.head))) // This generates tuples of verb -> base verb (e.g had -> have)
        .reduceByKey((k1, k2) => if (k1 == k2) k1 else k2) // Current verbs array generates (have -> have twice, this eliminates duplicate records)
    
      val data2 = textFile
        .flatMap(_.split(" ")) // Split each line to words/tokens its called tokenization (I used backspace as seperator if you have tabs as seperator use that)
        .map(t => (t, 1)) // Generate count per token (i.e. (have, 1))
        .reduceByKey(_ + _) // Count appearance of each token (i.e. (have, 5)
    
      val t2 = verbs2.join(data2) // This will join two RDD by their keys (verbs -> (base verb, verb count))
        .map(d => d._2) // This is what we need key is base verb, value is count of that verb
        .reduceByKey(_ + _) // Sum all values that having same base verb (have, 5), (have, 3) => (have, 8)
    
      t2.take(10).foreach(println)
    

    Of course this answer assumes you will always have your verbs array and first element is the base form. If you want something that works without a verbs array and convert any verb to base format that is actually a NLP (Natural Language Processing) task and you need to use some kind of word normalization technique like this (As EmiCareOfCell44 indicated). You can also find implementation of such procedures in spark ML library.