Search code examples
javaapache-kafkaapache-stormapache-kafka-connect

How to I parse tuple value as an Person Object?


I have implemented a Logger Bolt in storm, the input of the tuple is coming from Kafka Topic. I am using Kafka Connect to listen to changes to mySQL database.

public class LoggerBolt extends BaseBasicBolt {

  private static final long serialVersionUID = 1L;
  private static final Logger LOG = Logger.getLogger(LoggerBolt.class);


  public void execute(Tuple input, BasicOutputCollector collector) {
     System.out.println(input.getValue(0));
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  }
}

When run on local cluster below gets printed.

Q�%Buckley, Rose RoseBuckley"BuckleyR@univ.edu"963.555.6855x5018963.777.5233Curator Q� Stanton, Kathie KathieStanton"StantonK@univ.edu963.555.7095963.777.1015Professor Q�Banks, Shannon Shannon BanksBanksS@univ.edu963.555.7198963.777.6979Professor Q�/Barnes, Cleo CleoBarnes BarnesC@univ.edu"963.555.7463x7335963.777.1583$Research Professor

I want to cast these details to Person Object, which is a model class? How do we parse the Tuple input into an object?

I tried input.getValues(0) , input.getFields(0) and other method, none seems to work.


Solution

  • If you're using storm-kafka-client, it assumes Strings by default. You can choose something else by doing e.g. kafkaSpoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);. The class you set just has to implement the Kafka Deserializer interface https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html.

    There's an equivalent setting for setting the key deserializer.