Search code examples
hadoopapache-sparksequencefile

Spark RDD take() with Sequence File


It looks like RDD.take() just repeats the last element read when backed by a SequenceFile.
For instance:

val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable])
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5)
System.out.println(records.map(_._2.toString).mkString("\n"))

Outputs:

Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)

Even though I know the rows are unique.

This problem is also present for sc.binaryRecords().

I realize this is probably related to the Hadoop Writable caching issue, but are there any plans to fix this? Are there any work-arounds?


Solution

  • I tried replicate your issue and Yes, I also saw similar behavior when calling take directly on the result of sc.sequenceFile(). But was able to find work around:

    Note: I am explaining using LongWritable and Text instead of RecordWritable. I am not sure of the import needed for RecordWritable
    My sequence file is having:(0,0) (1,1) (2,2) ...

    val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text])
    val map = rdd.map(case (k,v) => (k.get(),v.toString()))
    map.take(1);
    res5: Array[(Long, String)] = Array((0,0))
    map.take(5);
    res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4))