Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams stops if I use persistentKeyValueStore but works fine with inMemoryKeyValueStore (running in Docker container)


I'm obviously a beginner with kafka/kafka streams. I just need to read given messages from a few topics, given their id. While our actual topology is fairly complex, this Stream app just needs to achieve this single simple goal

This is how a store is created :

final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table(
        topic,
        Materialized.<String, String>as( persistentKeyValueStore(storeNameOf(topic)))
                    .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
                    .withCachingDisabled());
//      Materialized.<String, String>as( inMemoryKeyValueStore(storeNameOf(topic)))
//                  .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
//                  .withCachingDisabled());
);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), new Properties() {{ /** config items go here**/ }})
kafkaStreams.start();
//logic for awaiting kafkaStreams to reach `RUNNING` state as well as InvalidStateStoreException handling (by retrying) is ommited for simplicity :   
ReadOnlyKeyValueStore<String, String> replyStore = kafkaStreams.store(storeNameOf(topicName), QueryableStoreTypes.keyValueStore());

So, when using the commented inMemoryKeyValueStore materialization replyStore is sucessfully created and I can query the values within without a problem

With persistentKeyValueStore the last line fails with java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR. Note that I do check that KafkaStreams is in state RUNNING before the store call; the ERROR state is reached somehow within the call rather.

Do you think i might have missed anything when setting up the persistent store? Debugging hints would also greatly help, i'm quite stuck here I must confess Thanks !

Edit : The execution happens under a docker container. This was quite relevant but I ommited to add initialy


Solution

  • As Matthias J. Sax pointed out in comment form, to debug the problem the uncaughtExceptionHandler registration helped greatly .

    The actual issue was due to an incompatibility between RocksDB and the docker image I was using (so changed from openjdk:8-jdk-alpine to anapsix/alpine-java:8 )

    Related : https://issues.apache.org/jira/browse/KAFKA-4988 UnsatisfiedLinkError: /tmp/snappy-1.1.4-libsnappyjava.so Error loading shared library ld-linux-x86-64.so.2: No such file or directory