Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Can a changelog topic be reused?


I intend to share data aggregated by one stream with another one to reduce (re)processing time when restarting services or rebuilding aggregates. This stream creates a store which the changelog topic belongs to:

    @Bean
    fun wirtschaftseinheiten() = Consumer<KStream<String, WirtschaftseinheitAggregat>> {
        it.toTable(Materialized.`as`(wirtschaftseinheitTableStoreSupplier))
    }

And this is how I join the changelog topic:

fun KStream<String, ProjektEvent>.leftJoin(wirtschaftseinheiten: KTable<String, WirtschaftseinheitAggregat>): KTable<String, ProjektAggregat> =
        mapValues { _, v -> ProjektAggregat(projekt = v, projektErstelltAm = v.metaInfo.createdAt) }
            .groupByKey()
            // take the earliest date which should be from event with ACTION = CREATE_REQUEST
            .reduce { prev, next -> if (next.projektErstelltAm?.isAfter(prev.projektErstelltAm) == true) next.copy(projektErstelltAm = prev.projektErstelltAm) else next }
            .toStream()
            .toTable(Materialized.`as`(preliminaryProjektStoreSupplier))
            .leftJoin(
                wirtschaftseinheiten,
                { projektAggregat -> projektAggregat.projekt?.projekt?.technischerPlatz?.take(7) },
                { projektAggregat, wirtschaftseinheit ->
                    if (wirtschaftseinheit != null) {
                        projektAggregat + wirtschaftseinheit
                    } else {
                        logger().error("No wirtschaftseinheit found for $projektAggregat")
                        projektAggregat
                    }
                },
                Materialized.`as`(projektWirtschaftseinheitJoinStoreSupplier)
            )

but unfortunately no match will be found as the right side is always null.

If I directly join the topic, then it of course works, but du to migrations I have to rebuild topics which also means consuming the topic declared in wirtschaftseinheitTableStoreSupplier and this is time-consuming.

So therefore a general question: is this a feasible way? If not, is there a better one?


Solution

  • Switching from KTable to GlobalKTable solved the issue

    fun KStream<String, ProjektEvent>.leftJoin(wirtschaftseinheiten: GlobalKTable<String, WirtschaftseinheitAggregat>): KTable<String, ProjektAggregat> =
            mapValues { _, v -> ProjektAggregat(projekt = v, projektErstelltAm = v.metaInfo.createdAt) }
                .groupByKey()
                // take the earliest date which should be from event with ACTION = CREATE_REQUEST
                .reduce(
                    { current, next ->
                        if (next.projektErstelltAm?.isAfter(current.projektErstelltAm) == true)
                            next.copy(projektErstelltAm = current.projektErstelltAm)
                        else
                            next
                    },
                    Materialized.`as`(preliminaryProjektStoreSupplier)
                )
                .toStream()
                .leftJoin(
                    wirtschaftseinheiten,
                    { _, projektAggregat -> projektAggregat.projekt?.projekt?.technischerPlatz?.take(7) },
                    { projektAggregat, wirtschaftseinheit ->
                        if (wirtschaftseinheit != null) {
                            projektAggregat + wirtschaftseinheit
                        } else {
                            logger().error("No wirtschaftseinheit found for $projektAggregat")
                            projektAggregat
                        }
                    },
                )
                .toTable(Materialized.`as`(projektWirtschaftseinheitJoinStoreSupplier))