I have a .csv file which look like this:
message_id, hashtag
id_1, hashtag_1
id_2, hashtag_1
...
id_k, hashtag_m
....
I'm trying to find a cosine measure between each pair of hashtags in csv by doing the following:
def isHeader(s: String):Boolean = {
s.take(1) == "m"
}
def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
val msgs1 = pair._2._1.toSet
val msgs2 = pair._2._2.toSet
val numer = msgs1.intersect(msgs2).size
val denom = Math.sqrt(msgs1.size*msgs2.size)
(pair._1._1, pair._1._2, numer / denom)
}
def to_csv_cos(t: (String, String, Double)): String = {
t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
}
val messages = sc.textFile("....csv")
val msgData1 = messages.filter(x => !isHeader(x))
val data = msgData1.map(x => x.split(','))
val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
val pairs_mapped = pairs.join(msgs).map{
case (x, (y,z)) => (y, (x,z))
}.join(msgs).map{
case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
}
val res = pairs_mapped.map{
x => cs(x)
}.map(x => to_csv_cos(x)).saveAsTextFile("F:\\Scala\\result")
The idea is:
- Creating pairs from the elements (pairs)
- Finding all messages for each hashtag (msgs)
- Creating pairs: ( (hashtag_i, hashtag_j), (messages_with_hashtag_i, messages_with_hashtag_j) ) (pairs_mapped)
- Calculating a measure(res)
Well, I guess my code is just a rubbish, since I'm new to scala, spark and the concept of functuonal programming, but it works for a small csv (i tried with 100 lines). But I have to calculate it for the csv with ~25 millions of lines, and there is the problem. The process stops on the saveAsTextFile (or SparkHadoopWriter.scala in spark UI) and doesn't move on even in 30minutes and then it crashes with different errors(memory errors, sometimes just 'connection abort'...)
I found there on the website, that we can calculate cosine measure by using dataframes, but I don't quite understand how to create a proper dataframe from my data.
So please, could you give me some pieces of advice how to modify my code to make it work or how to create a proper dataframe from the csv or anything else?
I will be grateful for any help you can give!
RDD has lazy and active operations. The active operations are 'saveToTextFile', 'persist', 'cash', 'collect', 'top', 'take', 'foreach'. The lazy operations are 'map', 'filter', 'group', 'join' and lot others. At your programm almost all operations are lazy. A lazy collection can be calculated many times. You should use caching so that it is evaluated only once.
When you work with big data, you should use memory sparingly. If your tags or identifiers are Int, you should use _.toInt. Avoid using text files for non-learning programs and purposes without testing. The text file is pretty good for a person, but it can be slow for a PC: for example, Double takes 8 bytes, but if you write 0.4082482904638631 to text file, this Double takes 18 characters (36 bytes). In addition, if your tag or ID has a constant size, you can not write a comma.
Sorry for my bad English.
type Id = String
type Tag = String
type Measure = Double
val messages: RDD[String] = sc.textFile("1.csv").filter(_.head != 'm')
val data: RDD[(Tag, Id)] = messages.map(line => line.split(","))
.map(pair => pair(1) -> pair(0))
val tagIds: RDD[(Tag, Set[Id])] = data.groupByKey()
.mapValues(_.toSet)
.persist(StorageLevel.MEMORY_AND_DISK)
val tagIds1TagIds2: RDD[((Tag, Set[Id]), (Tag, Set[Id]))] = tagIds.cartesian(tagIds).filter({
case ((t1,s1), (t2,s2)) => t1 < t2
})
val tagPairsWithMeasure: RDD[(Tag, Tag, Measure)] = tagIds1TagIds2.map({
case ((t1,l1), (t2,l2)) => (t1,t2, {
val numer = l1.intersect(l2).size
val denom = Math.sqrt(l1.size*l2.size)
numer.toDouble / denom
})
})
val lines: RDD[String] = tagPairsWithMeasure.map({
case (t1, t2, m) => s"$t1,$t2,$m"
})
Test:
id1,tag1
id1,tag2
id3,tag3
id3,tag2
id5,tag3
id6,tag1
id7,tag1
id8,tag2
Answer:
tag2,tag3,0.4082482904638631 // 1/sqrt(3*2)
tag1,tag2,0.3333333333333333 // 1/sqrt(3*3)
tag1,tag3,0.0 // 0/sqrt(3*2)