I am using below code to read large xml file (in GBs) in hadoop RecordReader using XMLStreamReader
public class RecordReader {
int progressCouunt = 0;
public RecordReader() {
XMLInputFactory factory = XMLInputFactory.newInstance();
FSDataInputStream fdDataInputStream = fs.open(file); //hdfs file
try {
reader = factory.createXMLStreamReader(fdDataInputStream);
} catch (XMLStreamException exception) {
throw new RuntimeException("XMLStreamException exception : ", exception);
}
}
@Override
public float getProgress() throws IOException, InterruptedException {
return progressCouunt;
}
}
My question is how to get reading progress of the file with XMLStreamReader as it does not provide any start or end position to calculate the progress percentage. I have refered to How do I keep track of parsing progress of large files in StAX?, but cannot user filterReader. Please help me here.
You could wrap the InputStream
by extending FilterInputStream
.
public interface InputStreamListener {
void onBytesRead(long totalBytes);
}
public class PublishingInputStream extends FilterInputStream {
private final InputStreamListener;
private long totalBytes = 0;
public PublishingInputStream(InputStream in, InputStreamListener listener) {
super(in);
this.listener = listener;
}
@Override
public int read(byte[] b) {
int count = super.read(b);
this.totalBytes += count;
this.listener.onBytesRead(totalBytes);
}
// TODO: override the other read() methods
}
Usage
XMLInputFactory factory = XMLInputFactory.newInstance();
InputStream in = fs.open(file);
final long fileSize = someHadoopService.getFileLength(file);
InputStremListener listener = new InputStreamListener() {
public void onBytesRead(long totalBytes) {
System.out.println(String.format("Read %s of %s bytes", totalBytes, fileSize));
}
};
InputStream publishingIn = new PublishingInputStream(in, listener);
try {
reader = factory.createXMLStreamReader(publishingIn);
// etc