Since RocksDB still doesn't support Apple Silicon currently only x86_64 JDKs via Rosetta can be used which are 5 times slower than a native JDK. Therefore I'd like to replace RocksDB by an in-memory key-value store. How can Kafka be configured to use such an in-memory store by default?
It's similar to jego's answer, but I work with suppliers. This is how I configured them:
@Profile("prod || stage || test")
@Configuration
class PersistentStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.persistentKeyValueStore(ProjektanhangStore.NAME)
}
@Profile( "it || dev")
@Configuration
class ProjektInMemoryStoreConfiguration {
@Bean
fun projektanhangStoreSupplier(): KeyValueBytesStoreSupplier = Stores.inMemoryKeyValueStore(ProjektanhangStore.NAME)
}
And this is where and how the supplier selected according to the spring profile will be injected and used. Pay attention to the @Bean and @Configuration class names.
@Configuration
class ProjektAnhangStreamConfiguration {
@Inject
private lateinit var projektanhangStoreSupplier: KeyValueBytesStoreSupplier
@Bean
fun projektanhaenge() = Consumer<KStream<String, AnhangEvent>> {
it.map { _, v -> KeyValue(v.anhang.projektId, v) }
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(AnhangEvent::class.java)))
.aggregate(
{ ProjektanhangAggregator() },
{ _, anhangEvent, aggregator ->
when (anhangEvent.action) {
CREATE -> aggregator.add(anhangEvent.anhang)
DELETE -> aggregator.remove(anhangEvent.anhang)
UPDATE -> aggregator.update(anhangEvent.anhang)
}
},
Materialized
.`as`<String, ProjektanhangAggregator>(projektanhangStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(ProjektanhangAggregator::class.java))
)
}
}