Search code examples
spring-bootapache-kafka-streams

KafkaStreams is not running. State is ERROR


I am facing below exception while running an application running with kafka streams.

'handleRemaining' is not implemented by this handler#012org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ethoca.bouncer.service.IssuerInputConsumer.onMessage(org.apache.avro.specific.SpecificRecord,byte[],int)' threw exception; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)#012#011at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)#012#011at java.util.concurrent.FutureTask.run(FutureTask.java:266)#012#011at java.lang.Thread.run(Thread.java:750)#012#011Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace#012#011#011at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)#012#011#011at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)#012#011#011at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)#012#011#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645)#012Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.#012#011at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:373)#012#011at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1600)

This issue is only seen when below lines of code are added:

KStream<String , RuleConfig> rulconfigstream = 
        builder.stream(ruletopic,Consumed.with(stringSerde, ruleConfigSerde));

rulconfigstream.groupBy((key, value) -> value.getScheme(),
        Grouped.with(Serdes.String(),ruleConfigSerde))
        .aggregate(ArrayList::new,
                (key,value,list) -> 
                {
                    list.removeIf(p->p.getOrder().equals(value.getOrder()));
                    list.add(value);
                    return list.stream().sorted(Comparator.comparing(RuleConfig :: getOrder))
                            .collect(Collectors.toList());
                },
                Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as
                        (RULE_STORE)/* table/store name */
                        .withKeySerde(stringSerde).withValueSerde(listSerde)
        );

The other part of the code contains creating Global K tables. Not sure if adding a Ktable (created above) and Global ktables in a topology is creating the issue. Can someone please provide some insights here.

I am using kafka.streams.version: 3.1.2

I am expecting topology to have this store named "RULE_STORE" and for Kafka streams to start without any exceptions.


Solution

  • This issue is fixed. I had some null values in "ruletopic". Adding filter while creating kstream fixed this issue. Below is updated code: KStream<String , RuleConfig> rulconfigstream = builder.stream(ruletopic,Consumed.with(stringSerde, ruleConfigSerde)).filter((k,v)-> Objects.nonNull(k) && Objects.nonNull(v));