I am new to Avro Format. I am trying to collect Avro messages from a JMS Queue using Storm-Jms spout and send them to hdfs using hdfs bolt.
Queue is sending avro but i am not able to get them in avro format using the HDFS BOLT.
How to properly collect the avro message and send them downstream without encoding errors in hdfs.
The existing HDFS Bolt does not support Writing avro Files we need to overcome this by making the following changes. In this sample Code i am using the getting JMS Messages from my spout and the converting those JMS bytes message to AVRO and emmiting them to HDFS.
This code can serve as a sample for modifying the methods in AbstractHdfsBolt.
public void execute(Tuple tuple) {
try {
long length = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int)length];
///////////////////////////////////////
bytesMessage.readBytes(bytes);
String replyMessage = new String(bytes, "UTF-8");
datumReader = new SpecificDatumReader<IndexedRecord>(schema);
decoder = DecoderFactory.get().binaryDecoder(bytes, null);
result = datumReader.read(null, decoder);
synchronized (this.writeLock) {
dataFileWriter.append(result);
dataFileWriter.sync();
this.offset += bytes.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
this.out.flush();
}
this.syncPolicy.reset();
}
dataFileWriter.flush();
}
if(this.rotationPolicy.mark(tuple, this.offset)){
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException | JMSException e) {
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
@Override
void closeOutputFile() throws IOException {
this.out.close();
}
@Override
Path createOutputFile() throws IOException {
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path);
dataFileWriter.create(schema, out);
return path;
}
@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
// TODO Auto-generated method stub
LOG.info("Preparing HDFS Bolt...");
try {
schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
} catch (IOException e1) {
e1.printStackTrace();
}
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
JMSAvroUtils JASV = new JMSAvroUtils();
}