Search code examples
algorithmscalaapache-sparkkey-value-coding

Keep specific chars from a column of strings (Array[String, Int]) and apply algorithm per group


I have a csv file containing values like Z1:A, Z2:B etc seperated by comma. I want to: 1. create key-values with the number of frequencies for each key (already done this part) 2. re-write my array (or maybe tuple?) by keeping Zx (where x can be any integer) and throw away colon and everything followin (:A etc).

This is my dummy file (for simplicity there are only Z1 and Z2).

So I load my cvs into a val

val example1 = sc.textFile("/Users/....../Spark_stack/example_1.csv")

then I execute map-reduce in order to get my desired outcome

val counts = example1.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_).collect

I am not sure if .collect is necessary, but it is the only way to call entire rows or specific cells from my "table".

If I print that

counts.foreach(println)

I get:

scala> counts.foreach(println)
(Z1:C,5)
(Z1:E,3)
(Z1:A,10)
(Z2:B,2)
(Z2:A,2)
(Z1:D,4)
(Z2:C,1)
(Z1:B,24)

I want to rewrite that as:

(Z1,5)
(Z1,3)
(Z1,10)
(Z2,2)
(Z2,2)
(Z1,4)
(Z2,1)
(Z1,24)

One possible way to do this is by using map and substring(0,2)

scala> counts.map(x => (x._1.substring(0,2),x._2))
res25: Array[(String, Int)] = Array((Z1,5), (Z1,3), (Z1,10), (Z2,2), (Z2,2), (Z1,4), (Z2,1), (Z1,24))

The problem here is than I might be given an some point so many Z's that their total number is greater than 9, for example Z15:A, or even Z123:D

Hence I need something more dynamic, that can tell where : is and substring until that point. My problem is that I don't know how to write it.

If I call:

scala> counts(1)._1.indexOfSlice(":")
res28: Int = 2

I get the posistion of :, hence I can apply like this:

scala> counts(1)._1.substring(0,counts(1)._1.indexOfSlice(":"))
res30: String = Z1

but I don;t know how to apply it to the entire counts, and not just in a single line. I even tried foreach but it dodn;t work.

Once I do this, I need to apply the following algorithm for each individual Z1, Z2 etc

somehow sort it in reverse order like this (works for a single Zx though, so I need to sort by 2nd column desc and my first column)

val sorted = counts.sortBy(_._2).reverse

and for each unique Zx apply this

var h =0
for (i <- 0 to (sorted.length-1) ) { if ( sorted(i)._2 >= i+1) { h = i+1;}}

in order to finally get for each Zx an integer (the var h from the for-loop above)

Sorry if it is too complicated, I am totally new to scala-spark.


Solution

  • counts.map(x => (x._1.substring(0, x._1.indexOf(":")), x._2))