Search code examples
apache-kafka-streamsstream-processing

Kafka Streams Processor has no access to StateStore null the store is not connected to the processor


I have the code in A. After we call builtTopology = builder.build, the call to new org.apache.kafka.streams.TopologyTestDriver(builtTopology, properties) gives me the error in B. I've combed through the code and verified that the state store is properly connected to the Processor. From my reading, that should allow the Processor to store and retrieve data as intended.

  • Builder Configuration: The state store is registered with the builder using builder.addStateStore, and it's explicitly connected to the processor by passing the state store name when calling the process method.
  • ProcessorContext and State Store: During the initialization (init) of the processor, the state store is accessed using the processor context and the state store name, ensuring the processor has access to the store.

What am I missing here?

B.

Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000011 has no access to StateStore null as the store is not connected to the processor.

If you add stores manually via '.addStateStore()'
make sure to connect the added store to the processor by providing the processor name to '.addStateStore()'
or connect them via '.connectProcessorAndStateStores()'.

DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()'
to connect the store to the corresponding operator,
or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself.
If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.

A.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;

public class StreamProcessingApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        createTopology(builder);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    public static void createTopology(StreamsBuilder builder) {
        KStream<String, String> stream1 = builder.stream("input-topic-1");
        KStream<String, String> stream2 = builder.stream("input-topic-2");

        // Define and register the state store
        final String stateStoreName = "join-store";
        builder.addStateStore(
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(stateStoreName),
                Serdes.String(),
                Serdes.String()
            )
        );

        KStream<String, String> joinedStream = stream1.outerJoin(
                stream2,
                (value1, value2) -> {
                    if (value1 == null) {
                        return "null-" + value2;
                    } else if (value2 == null) {
                        return value1 + "-null";
                    }
                    return value1 + "-" + value2;
                },
                JoinWindows.of(Duration.ofMinutes(5)),
                StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
        );

        joinedStream.to("output-topic");

        // Process the joined stream and utilize the state store
        joinedStream.process(() -> new AbstractProcessor<String, String>() {
            private KeyValueStore<String, String> stateStore;

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                this.stateStore = (KeyValueStore<String, String>) context.getStateStore(stateStoreName);
                context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                    // Check for values that did not join
                    KeyValueIterator<String, String> iterator = this.stateStore.all();
                    while (iterator.hasNext()) {
                        var entry = iterator.next();
                        if (entry.value.endsWith("-null") || entry.value.startsWith("null-")) {
                            System.out.println("Unmatched record found: Key=" + entry.key + ", Value=" + entry.value);
                            // Custom handling logic for unmatched records
                        }
                    }
                    iterator.close();
                });
            }

            @Override
            public void process(String key, String value) {
                // Store each joined record in the state store
                this.stateStore.put(key, value);
            }

            @Override
            public void close() {
                // Cleanup
            }
        }, stateStoreName);  // Connect the state store to the processor
    }
}

UPDATE 2024.08.02 @ 3:22pm

I did do some initial research on this issue. And they are mostly saying to pass in the state store name when adding the Processor (stream.process(..., "storeName")). But I've definitely done that. And the same error still appears.

Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-TRANSFORM-0000000002 has no access to StateStore my-store as the store is not connected to the processor...

Solution

  • I got this sorted out finally. My code had a typo where I was placing a null storeName into context.getStateStore(stateStoreName).

    This code otherwise works as expected.