Search code examples
javaapache-kafkaapache-kafka-streamskafka-producer-api

how to use globalKtable and StateStore on the same topic?


Just to clarify, I'm new to Kafka, so sorry if my questions seems undocumented, I am reading tutorials, docs and everything I can to understand this.

I am trying to read all values from a GlobalStore in order to update it's values, then use the StateStore that already exists to put these new updated values.

I'm trying to do this because when I do :

this.stateStore.all();

I only have 1/10th of data, if I understood correctly, this is because I have 10 partitions, and ss, only reads one (althought I don't exactly understand why)

This is my globalTable :

    public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
                getInputTopic(),
                getDataTopic(),
                getToEsTopic());

        builder.globalTable(
                getDataTopic(),
                Consumed.with(Serdes.String(), fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

And this is the addStateStore, which I can't remove because it's used elsewhere on the code:

       ...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),
                    Serdes.String(),
                    fooSerdes));
    ...

    return builder;
}

So, theoretically, what I was thinking on doing was to remove the StateStore which is also using the same topic, and put my data using one of my data.process topics, the problem is that this processor does others things with this StateStore, so I can't nuke it.

I'm lost here, any light would help a lot. Thanks !


Solution

  • It's a little unclear what you actually try to achieve. However, a few high level explanations:

    A GlobalKTable has only one purpose: to read data without modification from a topic to allow to either do a KStream-GlobalKTable-join or to query the store via "interactive queries".

    Hence, you cannot really do what you want as copying data from a global store to another store is not possible in the way you intent it. You will need to duplicate the input topic and read it twice: (1) as GlobalKTable and (2) as regular KStream to modify the data before you put it into a store. For (2) you could use transform().

    Hope this helps.