Search code examples
apache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Missing aggregate components


The durchfuehrungen are in a few cases null even if the records are correct and the cogrouping with them works elsewhere. In the history of the resulting affected ProjektAggregat records no single event contains the depending durchfuehrungen, they’re always null and couldn’t figure it out why.

Function { projekte: KStream<String, ProjektEvent> ->
            Function { projektstatus: KStream<String, ProjektStatusEvent> ->
                Function { befunde: KStream<String, ProjektBefundAggregat> ->
                    Function { aufgaben: KStream<String, ProjektAufgabeAggregat> ->
                        Function { wirtschaftseinheiten: KTable<String, WirtschaftseinheitAggregat> ->
                            Function { durchfuehrungen: KStream<String, ProjektDurchfuehrungAggregat> ->
                                Function { gruppen: KStream<String, ProjektGruppeAggregat> ->
                                    Function { mietobjekte: KTable<String, MietobjektAggregat> ->
                                        projekte
                                            .leftJoin(wirtschaftseinheiten)
                                            .leftJoin(mietobjekte)
                                            .cogroup { _, base, current: ProjektAggregat ->
                                                current.copy(
                                                    projekt = base.projekt,
                                                    wirtschaftseinheit = base.wirtschaftseinheit,
                                                    mietobjekt = base.mietobjekt,
                                                    projektErstelltAm = base.projektErstelltAm
                                                )
                                            }
                                            .cogroup(projektstatus.groupByKey()) { _, projektstatusEvent, aggregat -> aggregat + projektstatusEvent }
                                            .cogroup(befunde.groupByKey()) { _, befundAggregat, aggregat -> aggregat + befundAggregat }
                                            .cogroup(aufgaben.groupByKey()) { _, aufgabeAggregat, aggregat -> aggregat + aufgabeAggregat }
                                            .cogroup(durchfuehrungen.groupByKey()) { _, durchfuehrungAggregat, aggregat -> aggregat + durchfuehrungAggregat }
                                            .cogroup(gruppen.groupByKey()) { _, gruppeAggregat, aggregat -> aggregat + gruppeAggregat }
                                            .aggregate({ ProjektAggregat() }, Materialized.`as`(projektStoreSupplier))
                                            .toStream()
                                            .filterNot { _, projektAggregat -> projektAggregat.projekt == null }
                                            .transform({ EventTypeHeaderTransformer() })
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

Event if I change the stream to use leftJoins instead of cogrouops there are missing durchfuehrungen:

    Function { projektstatus: KStream<String, ProjektStatusEvent> ->
                Function { befunde: KStream<String, ProjektBefundAggregat> ->
                    Function { aufgaben: KStream<String, ProjektAufgabeAggregat> ->
                        Function { wirtschaftseinheiten: GlobalKTable<String, WirtschaftseinheitAggregat> ->
                            Function { durchfuehrungen: KStream<String, ProjektDurchfuehrungAggregat> ->
                                Function { gruppen: KStream<String, ProjektGruppeAggregat> ->
                                    Function { mietobjekte: GlobalKTable<String, MietobjektAggregat> ->
                                        projekte
                                            .filterNot { _, projektEvent -> projektEvent.action == CREATE_REQUEST }
                                            .leftJoin(wirtschaftseinheiten)
                                            .leftJoin(mietobjekte)
                                            .leftJoin(projektstatus.toTable()) { aggregat, projektstatusEvent ->
                                                projektstatusEvent?.let { aggregat + projektstatusEvent } ?: aggregat
                                            }
                                            .leftJoin(befunde.toTable()) { aggregat, befundAggregat -> befundAggregat?.let { aggregat + befundAggregat } ?: aggregat }
                                            .leftJoin(aufgaben.toTable()) { aggregat, aufgabeAggregat -> aufgabeAggregat?.let { aggregat + aufgabeAggregat } ?: aggregat }
                                            .leftJoin(durchfuehrungen.toTable()) { aggregat, durchfuehrungAggregat ->
                                                durchfuehrungAggregat?.let { aggregat + durchfuehrungAggregat } ?: aggregat
                                            }
                                            .leftJoin(
                                                gruppen.toTable(),
                                                { aggregat, gruppeAggregat -> gruppeAggregat?.let { aggregat + gruppeAggregat } ?: aggregat },
                                                Materialized.`as`(ProjektStore.NAME)
                                            )
                                            .toStream()
                                            .filterNot { _, projektAggregat -> projektAggregat.projekt == null }
                                            .process(ProcessorSupplier { EventTypeHeaderProcessor() })
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

Solution

  • The cause is a bug in the event type filtering mechanism of Spring Cloud Stream which causes several matching records to not to arrive at the stream. After deactivating the feature all desired records arrive. https://github.com/spring-cloud/spring-cloud-stream/issues/2627