I am facing below exception while running an application running with kafka streams.
'handleRemaining' is not implemented by this handler#012org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ethoca.bouncer.service.IssuerInputConsumer.onMessage(org.apache.avro.specific.SpecificRecord,byte[],int)' threw exception; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)#012#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)#012#011at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)#012#011at java.util.concurrent.FutureTask.run(FutureTask.java:266)#012#011at java.lang.Thread.run(Thread.java:750)#012#011Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace#012#011#011at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)#012#011#011at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)#012#011#011at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)#012#011#011at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645)#012Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.#012#011at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:373)#012#011at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1600)
This issue is only seen when below lines of code are added:
KStream<String , RuleConfig> rulconfigstream =
builder.stream(ruletopic,Consumed.with(stringSerde, ruleConfigSerde));
rulconfigstream.groupBy((key, value) -> value.getScheme(),
Grouped.with(Serdes.String(),ruleConfigSerde))
.aggregate(ArrayList::new,
(key,value,list) ->
{
list.removeIf(p->p.getOrder().equals(value.getOrder()));
list.add(value);
return list.stream().sorted(Comparator.comparing(RuleConfig :: getOrder))
.collect(Collectors.toList());
},
Materialized.<String, List<RuleConfig>, KeyValueStore<Bytes,byte[]>> as
(RULE_STORE)/* table/store name */
.withKeySerde(stringSerde).withValueSerde(listSerde)
);
The other part of the code contains creating Global K tables. Not sure if adding a Ktable (created above) and Global ktables in a topology is creating the issue. Can someone please provide some insights here.
I am using kafka.streams.version: 3.1.2
I am expecting topology to have this store named "RULE_STORE" and for Kafka streams to start without any exceptions.
This issue is fixed. I had some null values in "ruletopic". Adding filter while creating kstream fixed this issue. Below is updated code: KStream<String , RuleConfig> rulconfigstream = builder.stream(ruletopic,Consumed.with(stringSerde, ruleConfigSerde)).filter((k,v)-> Objects.nonNull(k) && Objects.nonNull(v));