I intend to share data aggregated by one stream with another one to reduce (re)processing time when restarting services or rebuilding aggregates. This stream creates a store which the changelog topic belongs to:
@Bean
fun wirtschaftseinheiten() = Consumer<KStream<String, WirtschaftseinheitAggregat>> {
it.toTable(Materialized.`as`(wirtschaftseinheitTableStoreSupplier))
}
And this is how I join the changelog topic:
fun KStream<String, ProjektEvent>.leftJoin(wirtschaftseinheiten: KTable<String, WirtschaftseinheitAggregat>): KTable<String, ProjektAggregat> =
mapValues { _, v -> ProjektAggregat(projekt = v, projektErstelltAm = v.metaInfo.createdAt) }
.groupByKey()
// take the earliest date which should be from event with ACTION = CREATE_REQUEST
.reduce { prev, next -> if (next.projektErstelltAm?.isAfter(prev.projektErstelltAm) == true) next.copy(projektErstelltAm = prev.projektErstelltAm) else next }
.toStream()
.toTable(Materialized.`as`(preliminaryProjektStoreSupplier))
.leftJoin(
wirtschaftseinheiten,
{ projektAggregat -> projektAggregat.projekt?.projekt?.technischerPlatz?.take(7) },
{ projektAggregat, wirtschaftseinheit ->
if (wirtschaftseinheit != null) {
projektAggregat + wirtschaftseinheit
} else {
logger().error("No wirtschaftseinheit found for $projektAggregat")
projektAggregat
}
},
Materialized.`as`(projektWirtschaftseinheitJoinStoreSupplier)
)
but unfortunately no match will be found as the right side is always null.
If I directly join the topic, then it of course works, but du to migrations I have to rebuild topics which also means consuming the topic declared in wirtschaftseinheitTableStoreSupplier and this is time-consuming.
So therefore a general question: is this a feasible way? If not, is there a better one?
Switching from KTable to GlobalKTable solved the issue
fun KStream<String, ProjektEvent>.leftJoin(wirtschaftseinheiten: GlobalKTable<String, WirtschaftseinheitAggregat>): KTable<String, ProjektAggregat> =
mapValues { _, v -> ProjektAggregat(projekt = v, projektErstelltAm = v.metaInfo.createdAt) }
.groupByKey()
// take the earliest date which should be from event with ACTION = CREATE_REQUEST
.reduce(
{ current, next ->
if (next.projektErstelltAm?.isAfter(current.projektErstelltAm) == true)
next.copy(projektErstelltAm = current.projektErstelltAm)
else
next
},
Materialized.`as`(preliminaryProjektStoreSupplier)
)
.toStream()
.leftJoin(
wirtschaftseinheiten,
{ _, projektAggregat -> projektAggregat.projekt?.projekt?.technischerPlatz?.take(7) },
{ projektAggregat, wirtschaftseinheit ->
if (wirtschaftseinheit != null) {
projektAggregat + wirtschaftseinheit
} else {
logger().error("No wirtschaftseinheit found for $projektAggregat")
projektAggregat
}
},
)
.toTable(Materialized.`as`(projektWirtschaftseinheitJoinStoreSupplier))