Search code examples
jsonapache-kafkaapache-stormspout

JSON Kafka spout in Apache Storm


I am building a Storm topology with Kafka spout. I am consuming from Kafka (without Zookeeper) in a JSON format, and Storm should output it.
How can I define proper schema for JSON data type? Currently, I have such code base with basic spout implementation:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

I am new to Apache Storm so would be glad for any advice.


Solution

  • You can do a couple of things:

    You can define a RecordTranslator. This interface allows you to define how the spout will construct the tuple, based on the ConsumerRecord it has read from Kafka.

    The default implementation looks like this:

    public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
    
        @Override
        public List<Object> apply(ConsumerRecord<K, V> record) {
            return new Values(record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    
        @Override
        public Fields getFieldsFor(String stream) {
            return FIELDS;
        }
    

    As you can see, you will be given a ConsumerRecord, which is a type built into the underlying Kafka client library, and then have to turn it into a List<Object> which will be the tuple values. If you wanted to do something complex with the record before emitting the data, this would be how you would do it. For example, if you wanted to stuff the key, value and offset into a data structure that it then emitted, you could do that here. You use the translator like KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()

    A better alternative if you only want to deserialize the key/value into one of your own data classes, is to implement a Deserializer. This will let you define how to deserialize the key/value you get from Kafka. If you want to deserialize e.g. the value to your own data class, you can do it using this interface.

    The default StringDeserializer does this:

        @Override
        public String deserialize(String topic, byte[] data) {
            try {
                if (data == null)
                    return null;
                else
                    return new String(data, encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
            }
        }
    

    Once you've created your own Deserializer, you use it by doing something like KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build(). There's a similar consumer property for setting the value deserializer.