Search code examples
apache-kafka-streamsspring-cloud-stream

Dynamically materialize KTable aggregate to different state stores


Is there a way to dynamically choose which state store to materialize the KTable to? Currently the name is MonthlyAggregates.NAME and I want to materialize in every month in a state store named by the current month e.g. 2020-JULY. But since the topology will be read and constructed at the system startup, only the state store used during initialisation will be used.

    @Bean
    fun leistung() =
        Consumer<KStream<ByteArray, ByteArray>> {
            it
                .transform(TransformerSupplier { EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
                .map { _, v -> KeyValue(createKey(v.payload as VerrechenbareLeistungPerformedEvent), v.payload) }
                .peek { key, value -> validateTechnicalLocation(key, value) }
                .transform(TransformerSupplier { DuplicateFilteringTransformer() }, LeistungEvents.NAME)
                .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(VerrechenbareLeistungPerformedEvent::class.java)))
                .aggregate(
                    { ItemAggregator() },
                    { _, event, aggregator -> aggregator.addItem(event) },
                    Named.`as`("aggregate"),
                    Materialized.`as`<String, ItemAggregator, KeyValueStore<Bytes, ByteArray>>(MonthlyAggregates.NAME)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(ItemAggregator::class.java))
                )
        }

I tried dynamically adding state stores by leveraging the Processor API, too:

class DuplicateFilteringProcessor(private val topology: Topology) : Processor<String, VerrechenbareLeistungPerformedEvent> {
    private lateinit var processorContext: ProcessorContext
    private lateinit var stateStore: KeyValueStore<String, VerrechenbareLeistungPerformedEvent>

    override fun init(context: ProcessorContext) {
        processorContext = context
        val storeBuilder = KeyValueStoreBuilder<String, VerrechenbareLeistungPerformedEvent>(
            Stores.persistentKeyValueStore(LeistungEvents.NAME),
            Serdes.StringSerde(),
            JsonSerde(VerrechenbareLeistungPerformedEvent::class.java),
            Time.SYSTEM
        )
        topology.addStateStore(storeBuilder, "filter_duplicates")
        topology.connectProcessorAndStateStores("filter_duplicates", LeistungEvents.NAME)
        val store = storeBuilder.build()
        processorContext.register(store) { _, _ -> }
        stateStore = processorContext.getStateStore(LeistungEvents.NAME) as KeyValueStore<String, VerrechenbareLeistungPerformedEvent>
    }

    override fun process(key: String, value: VerrechenbareLeistungPerformedEvent) {
        val eventInStateStore = stateStore.get(value.businessId)
        if (eventInStateStore == null) {
            stateStore.putIfAbsent(value.businessId, value)
            processorContext.forward(key, value)
        } else {
            logger().error("""Event with businessId ${value.businessId} has already been processed
                            Event in state store: $eventInStateStore
                            Event just received:  $value""".trimIndent())
        }
//        processorContext.commit()
    }

    override fun close() {
        // stateStore closing will be managed by Kafka/Spring
    }

}

But in this case the state store won't transition to the open state and any request results in a NPE.


Solution

  • That is not supported. A state store cannot be renamed.