Hi I am new to Storm and Kafka. I am using storm 1.0.1 and kafka 0.10.0 we have a kafkaspout that would receive java bean from kafka topic. I have spent several hours digging to find the right approach for that. Found few articles which are useful but none of the approaches worked for me so far.
Following is my codes:
StormTopology:
public class StormTopology {
public static void main(String[] args) throws Exception {
//Topo test /zkroot test
if (args.length == 4) {
System.out.println("started");
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
kafkaConf1.zkRoot = args[2];
kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
System.out.println("started");
ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
//builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
//This is for field grouping in bolt we need two bolt for field grouping or it wont work
topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
Config config = new Config();
config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
config.setDebug(true);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
}
I am serializing the data at kafka using kryo
KafkaProducer:
public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.abc.serializer.MySerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
producer.send(producerRecord);
//producer.close();
}
public static void closeProducer(){
producer.close();
}
}
Kyro Serializer:
public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
output.writeLong(data.getStartedOn().getTime());
output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
Data data = new Data();
data.setStartedOn(new Date(input.readLong()));
data.setEndedOn(new Date(input.readLong()));
return data;
}
I need to get the data back to the Data bean.
As per few articles I need to provide with a custom scheme and make it part of topology but till now I have no luck
Code for Bolt and Scheme
Scheme:
public class KryoScheme implements Scheme {
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
return kryo;
};
};
@Override
public List<Object> deserialize(ByteBuffer ser) {
return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
}
@Override
public Fields getOutputFields( ) {
return new Fields( "data" );
}
}
and bolt:
public class AnalysisBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("execute");
Fields fields = input.getFields();
try {
JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(1)));
String StartTime = (String) eventJson.get("startedOn");
String EndTime = (String) eventJson.get("endedOn");
String Oid = (String) eventJson.get("_id");
int V_id = (Integer) eventJson.get("vehicleId");
//call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
e.printStackTrace();
}
}
but if I submit the storm topology i am getting error:
java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme$1, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within
the prepare method of 'kafkaspout at the earliest.
Appreciate help to debug the issue and guide to right path.
Thanks
Your ThreadLocal is not Serializable. The preferable solution would be to make your serializer both Serializable and threadsafe. If this is not possible, then I see 2 alternatives since there is no prepare method as you would get in a bolt.