I'm getting the below rocksdb error when I try to use groupByKey
feature of kafka streams in a function. whereas a simple consumer function works fine.
Env: confluent v1.30.0
(confluent running on single-node development environment)
OS: Apple m1 mac - Big Sur v11.5.1 with rosetta
Java: openjdk 11.0.12 2021-07-20 LTS
Error Trace:
Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found. Did find:
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
at java.base/java.lang.Runtime.load0(Runtime.java:768)
at java.base/java.lang.System.load(System.java:1837)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)
problematic code
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.groupByKey()
.reduce((s, v1) -> s + ", " + v1)
.toStream()
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
normal working code:
@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
return kstream -> kstream
.peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
.flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
rocksDB does not support Apple Silicon natively yet.
Development of a version of rocksDB for Apple Silicon is in progress, but as of today (2021-08-07) not completed: https://github.com/facebook/rocksdb/issues/7720. In this GitHub issue, you can find a link to the code - also included here - of a working version of rocksDB on Apple Silicon, which you can use to build rocksDB yourself: https://github.com/adamretter/rocksdb/tree/macos-multi-arch
You could also run your code using an x86 JDK and the Rosetta emulation, where rocksDB should run properly. This StackOverflow answer might help you with that: https://stackoverflow.com/a/66779308/7821823