Search code examples
apache-kafkaavroapache-apex

Can I get example code to consume avro kafka message?


I just set up Datatorrent RTS (Apache Apex) platform and run the pi demo. I want to consume "avro" messages from kafka and then aggregate and store the data into hdfs. Can I get an example code for this or kafka?


Solution

  • Here is code for a complete working application uses the new Kafka input operator and the file output operator from Apex Malhar. It converts the byte arrays to Strings and writes them out to HDFS using rolling files with a bounded size (1K in this example); until the file size reaches the bound, it will have a temporary name with the .tmp extension. You can interpose additional operators between these two as suggested by DevT in https://stackoverflow.com/a/36666388):

    package com.example.myapexapp;
    
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    
    import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
    import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
    import org.apache.hadoop.conf.Configuration;
    
    import com.datatorrent.api.annotation.ApplicationAnnotation;
    import com.datatorrent.api.StreamingApplication;
    import com.datatorrent.api.DAG;
    
    import com.datatorrent.lib.io.ConsoleOutputOperator;
    import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
    import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
    
    @ApplicationAnnotation(name="MyFirstApplication")
    public class KafkaApp implements StreamingApplication
    {
    
      @Override
      public void populateDAG(DAG dag, Configuration conf)
      {
        KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
        in.setInitialPartitionCount(1);
        in.setTopics("test");
        in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        //in.setClusters("localhost:2181");
        in.setClusters("localhost:9092");   // NOTE: need broker address, not zookeeper
    
        LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
        out.setFilePath("/tmp/FromKafka");
        out.setFileName("test");
        out.setMaxLength(1024);        // max size of rolling output file
    
        // create stream connecting input adapter to output adapter
        dag.addStream("data", in.outputPort, out.input);
      }
    }
    
    /**
     * Converts each tuple to a string and writes it as a new line to the output file
     */
    class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
    {
      private static final String NL = System.lineSeparator();
      private static final Charset CS = StandardCharsets.UTF_8;
      private String fileName;
    
      @Override
      public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }
    
      @Override
      protected String getFileName(byte[] tuple) { return fileName; }
    
      public String getFileName() { return fileName; }
      public void setFileName(final String v) { fileName = v; }
    }