Search code examples
javaapache-sparkobject-filessequencefile

Save and Read Key-Value pair in Spark


I have a JavaPairRDD in the following format:

JavaPairRDD< String, Tuple2< String, List< String>>> myData;

I want to save it as a Key-Value format (String, Tuple2< String, List< String>>).

myData.saveAsXXXFile("output-path");

So my next job could read in the data directly to my JavaPairRDD:

JavaPairRDD< String, Tuple2< String, List< String>>> newData = context.XXXFile("output-path");

I am using Java 7, Spark 1.2, Java API. I tried saveAsTextFile and saveAsObjectFile, neither works. And I don't see saveAsSequenceFile option in my eclipse.

Does anyone have any suggestion for this problem? Thank you very much!


Solution

  • You could use SequenceFileRDDFunctions that is used through implicits in scala, however that might be nastier than using the usual suggestion for java of:

    myData.saveAsHadoopFile(fileName, Text.class, CustomWritable.class,
                            SequenceFileOutputFormat.class);
    

    implementing CustomWritable via extending

    org.apache.hadoop.io.Writable
    

    Something like this should work (did not check for compilation):

    public class MyWritable extends Writable{
      private String _1;
      private String[] _2;
    
      public MyWritable(Tuple2<String, String[]> data){
        _1 = data._1;
        _2 = data._2;
      }
    
      public Tuple2<String, String[]> get(){
        return new Tuple2(_1, _2);
      }
    
      @Override
      public void readFields(DataInput in) throws IOException {
        _1 = WritableUtils.readString(in);
        ArrayWritable _2Writable = new ArrayWritable();
        _2Writable.readFields(in);
        _2 = _2Writable.toStrings();
      }
    
      @Override
      public void write(DataOutput out) throws IOException {
        Text.writeString(out, _1);
        ArrayWritable _2Writable = new ArrayWritable(_2);
        _2Writable.write(out);
      }
    }
    

    such that it fits your data model.