Search code examples
javaapache-kafkaapache-kafka-streamsspring-kafka

Materialize kafka Stream after a flatMap


I want to consume form two Kafka topics with Kafka Streams supported by Spring Kafka. The Topics have a different key and value. I want to map the key and value from the second topic and merge it with the first one via the method: .merge(KStream<X,Y> otherStream).

Here is an example:

    // Block 1
    KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
        "second-topic",
        consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
    ).flatMap(
        (key, value) -> {
            List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
            // Do stuff an fill out the list
            return list;
        });

    // Block 2
    KStream<MyKey, MyValue>[] branches = stream
        .merge(stream2)
        ... business stuff

With this soltution I get a ClassCastException with the reason, that MyKey cannot be cast to MyKey. The cause is, that they are provided by different modules and class loaders. The error happens within the serialization, in the merge block. With transform(..) I got the same behavior. If I append the command .through("tmp-topic") everything works fine. It seams that the materialization by a topic returns a valid serializable object instead of the flatMap(...).

I found the following API doc in groupByKey:

... If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper), or transform(TransformerSupplier, String...)), and no data redistribution happened afterwards (e.g., via through(String)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe(). For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned correctly on its key...

As you can see, it seems, that the best practise after changing operations like flatMap(...) is writing it to a topic, because of serialization and repartitioning.

What do you think about the usage of through("topic") to get it work? Does anyone know, if there is a possibility to materialize after a flatMap(...) without write to a topic?

Versions
Spring Kafka Version: 2.2.5.RELEASE
Apache Kafka Client: 2.0.1
Apache Kafka Streams: 2.0.1


Solution

  • Just for some context, whenever you use a key-changing operation, any processors downstream using the new key trigger the creation of a repartition topic. The repartition topic makes sure the new key is on the correct partition. I know you probably know this already, but I'm just restating the point here for clarity.

    With that in mind, it's perfectly acceptable to perform a through() operation after you have modified the key because that's what Kafka Streams will do under the covers anyway.

    So having flatMap(...).through(someTopic) works fine.

    Additionally, by doing so, you also prevent the possibility of having multiple repartitions if you re-use the KStream instance with the modified key in other operations (joins, aggregations) downstream.

    HTH,

    Bill