The durchfuehrungen
are in a few cases null
even if the records are correct and the cogrouping with them works elsewhere.
In the history of the resulting affected ProjektAggregat
records no single event contains the depending durchfuehrungen
, they’re always null and couldn’t figure it out why.
Function { projekte: KStream<String, ProjektEvent> ->
Function { projektstatus: KStream<String, ProjektStatusEvent> ->
Function { befunde: KStream<String, ProjektBefundAggregat> ->
Function { aufgaben: KStream<String, ProjektAufgabeAggregat> ->
Function { wirtschaftseinheiten: KTable<String, WirtschaftseinheitAggregat> ->
Function { durchfuehrungen: KStream<String, ProjektDurchfuehrungAggregat> ->
Function { gruppen: KStream<String, ProjektGruppeAggregat> ->
Function { mietobjekte: KTable<String, MietobjektAggregat> ->
projekte
.leftJoin(wirtschaftseinheiten)
.leftJoin(mietobjekte)
.cogroup { _, base, current: ProjektAggregat ->
current.copy(
projekt = base.projekt,
wirtschaftseinheit = base.wirtschaftseinheit,
mietobjekt = base.mietobjekt,
projektErstelltAm = base.projektErstelltAm
)
}
.cogroup(projektstatus.groupByKey()) { _, projektstatusEvent, aggregat -> aggregat + projektstatusEvent }
.cogroup(befunde.groupByKey()) { _, befundAggregat, aggregat -> aggregat + befundAggregat }
.cogroup(aufgaben.groupByKey()) { _, aufgabeAggregat, aggregat -> aggregat + aufgabeAggregat }
.cogroup(durchfuehrungen.groupByKey()) { _, durchfuehrungAggregat, aggregat -> aggregat + durchfuehrungAggregat }
.cogroup(gruppen.groupByKey()) { _, gruppeAggregat, aggregat -> aggregat + gruppeAggregat }
.aggregate({ ProjektAggregat() }, Materialized.`as`(projektStoreSupplier))
.toStream()
.filterNot { _, projektAggregat -> projektAggregat.projekt == null }
.transform({ EventTypeHeaderTransformer() })
}
}
}
}
}
}
}
}
Event if I change the stream to use leftJoins
instead of cogrouops
there are missing durchfuehrungen
:
Function { projektstatus: KStream<String, ProjektStatusEvent> ->
Function { befunde: KStream<String, ProjektBefundAggregat> ->
Function { aufgaben: KStream<String, ProjektAufgabeAggregat> ->
Function { wirtschaftseinheiten: GlobalKTable<String, WirtschaftseinheitAggregat> ->
Function { durchfuehrungen: KStream<String, ProjektDurchfuehrungAggregat> ->
Function { gruppen: KStream<String, ProjektGruppeAggregat> ->
Function { mietobjekte: GlobalKTable<String, MietobjektAggregat> ->
projekte
.filterNot { _, projektEvent -> projektEvent.action == CREATE_REQUEST }
.leftJoin(wirtschaftseinheiten)
.leftJoin(mietobjekte)
.leftJoin(projektstatus.toTable()) { aggregat, projektstatusEvent ->
projektstatusEvent?.let { aggregat + projektstatusEvent } ?: aggregat
}
.leftJoin(befunde.toTable()) { aggregat, befundAggregat -> befundAggregat?.let { aggregat + befundAggregat } ?: aggregat }
.leftJoin(aufgaben.toTable()) { aggregat, aufgabeAggregat -> aufgabeAggregat?.let { aggregat + aufgabeAggregat } ?: aggregat }
.leftJoin(durchfuehrungen.toTable()) { aggregat, durchfuehrungAggregat ->
durchfuehrungAggregat?.let { aggregat + durchfuehrungAggregat } ?: aggregat
}
.leftJoin(
gruppen.toTable(),
{ aggregat, gruppeAggregat -> gruppeAggregat?.let { aggregat + gruppeAggregat } ?: aggregat },
Materialized.`as`(ProjektStore.NAME)
)
.toStream()
.filterNot { _, projektAggregat -> projektAggregat.projekt == null }
.process(ProcessorSupplier { EventTypeHeaderProcessor() })
}
}
}
}
}
}
}
}
The cause is a bug in the event type filtering mechanism of Spring Cloud Stream which causes several matching records to not to arrive at the stream. After deactivating the feature all desired records arrive. https://github.com/spring-cloud/spring-cloud-stream/issues/2627