Search code examples
hadoopjmsmessage-queueapache-stormavro

Storm-jms Spout collecting Avro messages and sending down stream?


I am new to Avro Format. I am trying to collect Avro messages from a JMS Queue using Storm-Jms spout and send them to hdfs using hdfs bolt.

Queue is sending avro but i am not able to get them in avro format using the HDFS BOLT.

How to properly collect the avro message and send them downstream without encoding errors in hdfs.


Solution

  • The existing HDFS Bolt does not support Writing avro Files we need to overcome this by making the following changes. In this sample Code i am using the getting JMS Messages from my spout and the converting those JMS bytes message to AVRO and emmiting them to HDFS.

    This code can serve as a sample for modifying the methods in AbstractHdfsBolt.

    public void execute(Tuple tuple) {          
            try {               
                long length = bytesMessage.getBodyLength();
                byte[] bytes = new byte[(int)length];
                ///////////////////////////////////////
                bytesMessage.readBytes(bytes);
                String replyMessage = new String(bytes, "UTF-8");
    
                datumReader = new SpecificDatumReader<IndexedRecord>(schema);
                decoder = DecoderFactory.get().binaryDecoder(bytes, null);
    
                result = datumReader.read(null, decoder);                               
                synchronized (this.writeLock) {                 
                    dataFileWriter.append(result);                                      
                    dataFileWriter.sync();
                    this.offset += bytes.length;                    
                   if (this.syncPolicy.mark(tuple, this.offset)) {
                       if (this.out instanceof HdfsDataOutputStream) {
                            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
                        } else {
                            this.out.hsync();
                            this.out.flush();
                        }
                        this.syncPolicy.reset();
                    }
                   dataFileWriter.flush();
                }
    
                if(this.rotationPolicy.mark(tuple, this.offset)){
                    rotateOutputFile(); // synchronized
                    this.offset = 0;
                    this.rotationPolicy.reset();
                }
            } catch (IOException | JMSException e) {
                LOG.warn("write/sync failed.", e);
                this.collector.fail(tuple);
            } 
        }
    
    @Override
    void closeOutputFile() throws IOException {
        this.out.close();
    }
    
    @Override
    Path createOutputFile() throws IOException {
        Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
        this.out = this.fs.create(path);
        dataFileWriter.create(schema, out);
        return path;
    }
    
    @Override
    void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
        // TODO Auto-generated method stub
         LOG.info("Preparing HDFS Bolt...");
         try {
    
                schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
            } catch (IOException e1) {              
                e1.printStackTrace();
            }
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
         dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
         JMSAvroUtils JASV = new JMSAvroUtils();         
    }