This is my code snippet
@Override
protected RecordWriter<String, String> getBaseRecordWriter(
FileSystem fs, JobConf job, String name, Progressable arg3)
throws IOException {
Path file2 = FileOutputFormat.getOutputPath(job);
String path = file2.toUri().getPath()+File.separator+ name;
FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
return new LineRecordWriter<String, String>(fileOut, "\t");
}
i am using Spark 1.6.1 and in my code i used saveAsHadoopFile()
method for which i write a class OutputFormat derived from org.apache.hadoop.mapred.lib.MultipleTextOutputFormat and i overwrite the above method.
On cluster it writes corrupt records in output files.
i think it is because of BufferedOutputStream
in
FSDataOutputStream fileOut = new FSDataOutputStream(
new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
Can we have any alternative for bufferedOutputStream
, since it writes as soon as the buffer gets full.
Note: updated the code. Sorry for the inconvenience.
I got the issue .. on cluster each worker will try to write in same (shared)file as both workers on different machine means different JVM and hence synchronized file write wont work here. thats why the corrupt records. Also i used NFS which is important factor.