Search code examples
javaapache-kafka-streamsconfluent-platformapple-m1rocksdb

Kafka Streams groupByKey not working due to RocksDB in Mac m1


I'm getting the below rocksdb error when I try to use groupByKey feature of kafka streams in a function. whereas a simple consumer function works fine.

Env: confluent v1.30.0 (confluent running on single-node development environment)

OS: Apple m1 mac - Big Sur v11.5.1 with rosetta

Java: openjdk 11.0.12 2021-07-20 LTS

Error Trace:

Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found.  Did find:
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
    at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
    at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
    at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
    at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
    at java.base/java.lang.Runtime.load0(Runtime.java:768)
    at java.base/java.lang.System.load(System.java:1837)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)

problematic code

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .groupByKey()
            .reduce((s, v1) -> s + ", " + v1)
            .toStream()
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}

normal working code:

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}

Solution

  • rocksDB does not support Apple Silicon natively yet.

    Development of a version of rocksDB for Apple Silicon is in progress, but as of today (2021-08-07) not completed: https://github.com/facebook/rocksdb/issues/7720. In this GitHub issue, you can find a link to the code - also included here - of a working version of rocksDB on Apple Silicon, which you can use to build rocksDB yourself: https://github.com/adamretter/rocksdb/tree/macos-multi-arch

    You could also run your code using an x86 JDK and the Rosetta emulation, where rocksDB should run properly. This StackOverflow answer might help you with that: https://stackoverflow.com/a/66779308/7821823