Search code examples
apache-kafkaapache-kafka-streamsspring-kafkarocksdb

How to replace RocksDB by in-memory db just for integration tests?


Since RocksDB still doesn't support Apple Silicon currently only x86_64 JDKs via Rosetta can be used which are 5 times slower than a native JDK. Therefore I'd like to replace RocksDB by an in-memory key-value store. How can Kafka be configured to use such an in-memory store by default?


Solution

  • It's similar to jego's answer, but I work with suppliers. This is how I configured them:

    @Profile("prod || stage || test")
    @Configuration
    class PersistentStoreConfiguration {
        @Bean
        fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.persistentKeyValueStore(ProjektanhangStore.NAME)
    }
    
    @Profile( "it || dev")
    @Configuration
    class ProjektInMemoryStoreConfiguration {
        @Bean
        fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.inMemoryKeyValueStore(ProjektanhangStore.NAME)
    }
    

    And this is where and how the supplier selected according to the spring profile will be injected and used. Pay attention to the @Bean and @Configuration class names.

    @Configuration
    class ProjektAnhangStreamConfiguration {
        @Inject
        private lateinit var projektanhangStoreSupplier: KeyValueBytesStoreSupplier
    
        @Bean
        fun projektanhaenge() = Consumer<KStream<String, AnhangEvent>> {
            it.map { _, v -> KeyValue(v.anhang.projektId, v) }
                .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(AnhangEvent::class.java)))
                .aggregate(
                    { ProjektanhangAggregator() },
                    { _, anhangEvent, aggregator ->
                        when (anhangEvent.action) {
                            CREATE -> aggregator.add(anhangEvent.anhang)
                            DELETE -> aggregator.remove(anhangEvent.anhang)
                            UPDATE -> aggregator.update(anhangEvent.anhang)
                        }
                    },
                    Materialized
                        .`as`<String, ProjektanhangAggregator>(projektanhangStoreSupplier)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(ProjektanhangAggregator::class.java))
                )
        }
    }