Search code examples
apache-kafkaapache-stormkryobigdata

Why use Kryo serialize framework into apache storm will over write data when blot get values


Maybe mostly develop were use AVRO as serialize framework in Kafka and Apache Storm scheme. But I need handle most complex data then I found the Kryo serialize framework also were successfully integrate it into our project which follow Kafka and Apache Storm environment. But when want to further operation there had a strange status.

I had sent 5 times message to Kafka, the Storm job also can read the 5 messages and deserialize success. But next blot get the data value is wrong. There print out the same value as the last message. Then I had add the print out after when complete the deserialize code. Actually it print out true there had different 5 message. Why the next blot can't the values? See my code below:

KryoScheme.java

public abstract class KryoScheme<T> implements Scheme {

private static final long serialVersionUID = 6923985190833960706L;

private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);

private Class<T> clazz;
private Serializer<T> serializer;

public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
    this.clazz = clazz;
    this.serializer = serializer;
}

@Override
public List<Object> deserialize(byte[] buffer) {
    Kryo kryo = new Kryo();
    kryo.register(clazz, serializer);
    T scheme = null;
    try {
        scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
        logger.info("{}", scheme);
    } catch (Exception e) {
        String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
                clazz.getName(), 
                new String(buffer));
        logger.error(errMsg, e);
        throw new FailedException(errMsg, e);
    }

    return new Values(scheme);
}}

PrintFunction.java

public class PrintFunction extends BaseFunction {

private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {

    List<Object> data = tuple.getValues();

    if (data != null) {
        logger.info("Scheme data size: {}", data.size());
        for (Object value : data) {
            PrintOut out = (PrintOut) value;
            logger.info("{}.{}--value: {}",
                    Thread.currentThread().getName(),
                    Thread.currentThread().getId(),
                    out.toString());

            collector.emit(new Values(out));
        }
    }

}}

StormLocalTopology.java

public class StormLocalTopology {

public static void main(String[] args) {

    ........

    BrokerHosts zk = new ZkHosts("xxxxxx");
    Config stormConf = new Config();
    stormConf.put(Config.TOPOLOGY_DEBUG, false);
    stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
    stormConf.put(Config.TOPOLOGY_WORKERS, 1);
    stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
    stormConf.put(Config.TOPOLOGY_TASKS, 1);

    TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
    actSpoutConf.fetchSizeBytes =  5 * 1024 * 1024 ;
    actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
    actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);

    actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    TridentTopology topology = new TridentTopology();
    TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);

    topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
            .each(new Fields("act"), new PrintFunction(), new Fields());

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topic+"Topology", stormConf,  topology.build());
}}

There also other problem why the kryo scheme only can read one message buffer. Is there other way get multi messages buffer then can batch send data to next blot.

Also if I send 1 message the full flow seems success.

Then send 2 message is wrong. the print out message like below:

56157 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-   05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-    05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8

Solution

  • I'm sorry this my mistake. Just found a bug in Kryo deserialize class, there exist an local scope parameter, so it can be over write in multi thread environment. Not change the parameter in party scope, the code run well.

    reference code see blow:

    public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {
    
    private static final long serialVersionUID = -4684340809824908270L;
    
    // It's wrong set
    
    //private T event; 
    
    public KryoSerializer(T event) {
        this.event = event;
    }
    
    @Override
    public void write(Kryo kryo, Output output, T event) {
        event.write(output);
    }
    
    @Override
    public T read(Kryo kryo, Input input, Class<T> type) {
        T event = new T();
        event.read(input);
        return event;
    }
    }