I have implemented a Logger Bolt in storm, the input of the tuple is coming from Kafka Topic. I am using Kafka Connect to listen to changes to mySQL database.
public class LoggerBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println(input.getValue(0));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
When run on local cluster below gets printed.
Q�%Buckley, Rose RoseBuckley"BuckleyR@univ.edu"963.555.6855x5018963.777.5233Curator Q� Stanton, Kathie KathieStanton"StantonK@univ.edu963.555.7095963.777.1015Professor Q�Banks, Shannon Shannon BanksBanksS@univ.edu963.555.7198963.777.6979Professor Q�/Barnes, Cleo CleoBarnes BarnesC@univ.edu"963.555.7463x7335963.777.1583$Research Professor
I want to cast these details to Person Object, which is a model class? How do we parse the Tuple input into an object?
I tried input.getValues(0) , input.getFields(0)
and other method, none seems to work.
If you're using storm-kafka-client
, it assumes Strings by default. You can choose something else by doing e.g. kafkaSpoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
. The class you set just has to implement the Kafka Deserializer interface https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html.
There's an equivalent setting for setting the key deserializer.