I am using Spark to read a bunch of files, elaborating on them and then saving all of them as a Sequence file. What I wanted, was to have 1 sequence file per partition, so I did this:
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
Does anybody know how to use the Hadoop Configuration Object inside the RDD closures?
Looks like it cannot be done, so here is the code I used:
final hdfsNameNodePath = "hdfs://quickstart.cloudera:8080";
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0)
throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNodePath);
//the string above should be passed as argument
SequenceFile.Writer writer = SequenceFile.createWriter(