It looks like RDD.take()
just repeats the last element read when backed by a SequenceFile.
For instance:
val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable])
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5)
System.out.println(records.map(_._2.toString).mkString("\n"))
Outputs:
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Even though I know the rows are unique.
This problem is also present for sc.binaryRecords()
.
I realize this is probably related to the Hadoop Writable caching issue, but are there any plans to fix this? Are there any work-arounds?
I tried replicate your issue and Yes, I also saw similar behavior when calling take directly on the result of sc.sequenceFile(). But was able to find work around:
Note: I am explaining using LongWritable and Text instead of RecordWritable. I am not sure of the import needed for RecordWritable
My sequence file is having:(0,0) (1,1) (2,2) ...
val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text])
val map = rdd.map(case (k,v) => (k.get(),v.toString()))
map.take(1);
res5: Array[(Long, String)] = Array((0,0))
map.take(5);
res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4))