Search code examples
apache-kafkaspring-kafkaapache-kafka-streamsspring-cloud-stream-binder-kafka

Sporadic ConcurrentModificationException when num.stream.threads > 1


A stream occasionally runs into a ConcurrentModificationException like this:

[wrwks-bef-projekt-aggregat-wirtschaftseinheitAndMietobjektUpdater-2577dae3-7c43-4782-bdd8-e51669a18469-StreamThread-6] ERROR o.a.k.s.p.internals.TaskManager - stream-thread [wrwks-bef-projekt-aggregat-wirtschaftseinheitAndMietobjektUpdater-2577dae3-7c43-4782-bdd8-e51669a18469-StreamThread-6] Failed to process stream task 1_2 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_2, processor=KSTREAM-SOURCE-0000000011, topic=wrw-technischerplatz-mietobjekt-aggregat-oeffentlich-1, partition=2, offset=7263, stacktrace=java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at org.apache.kafka.common.header.internals.RecordHeaders$FilterByKeyIterator.makeNext(RecordHeaders.java:184)
    at org.apache.kafka.common.header.internals.RecordHeaders$FilterByKeyIterator.makeNext(RecordHeaders.java:171)
    at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
    at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
    at org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor$3.process(AbstractKafkaStreamsBinderProcessor.java:640)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.util.ConcurrentModificationException: null
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at org.apache.kafka.common.header.internals.RecordHeaders$FilterByKeyIterator.makeNext(RecordHeaders.java:184)
    at org.apache.kafka.common.header.internals.RecordHeaders$FilterByKeyIterator.makeNext(RecordHeaders.java:171)
    at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
    at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
    at org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor$3.process(AbstractKafkaStreamsBinderProcessor.java:640)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
    ... 4 common frames omitted

I'm not sure whether it's a bug in the Kafka Streams client or the app code:

@Configuration
class WirtschaftseinheitAndMietobjektUpdaterStreamConfiguration {
    @Bean
    fun wirtschaftseinheitAndMietobjektUpdater() =
        Function { projekte: KTable<String, ProjektAggregat> ->
            Function { mietobjekte: KTable<String, MietobjektAggregat> ->
                Function { wirtschaftseinheiten: KTable<String, WirtschaftseinheitAggregat> ->
                    projekte
                        .filter { _, projektAggregat -> projektAggregat.action == AGGREGATE }
                        .leftJoin(
                            mietobjekte,
                            { it.projekt?.projekt?.technischerPlatz },
                            { projektAggregat, mietobjekt ->
                                if (mietobjekt != null)
                                    if (mietobjekt.tplnr.length > WIRTSCHAFTSEINHEIT_LENGTH)
                                        (projektAggregat + mietobjekt).copy(action = MO_WE_UPDATE)
                                    else
                                        projektAggregat
                                else
                                    projektAggregat
                            },
                        )
                        .leftJoin(
                            wirtschaftseinheiten,
                            { it.projekt?.projekt?.technischerPlatz?.take(WIRTSCHAFTSEINHEIT_LENGTH) },
                            { projektAggregat, wirtschaftseinheit ->
                                if (wirtschaftseinheit != null)
                                    (projektAggregat + wirtschaftseinheit).copy(action = MO_WE_UPDATE)
                                else
                                    projektAggregat
                            },
                        )
                        .toStream()
                        .filter { _, projektAggregat -> projektAggregat?.action == MO_WE_UPDATE }
                        .transform({ EventTypeHeaderTransformer() })
                }
            }
        }
}

Any useful hint would be highly appreciated.


Solution

  • Thanks to the hint by @MatthiasJSax I could narrow down the issue and solve it by not directly modifying the record headers, but instead a copy of them:

        override fun process(record: Record<K, V>) {
            // this creates a copy
            val forwardedRecord = record.withHeaders(record.headers())
            forwardedRecord.headers().remove(EVENT_TYPE_HEADER).add(EVENT_TYPE_HEADER, record.value().eventType.toByteArray())
            context.forward(forwardedRecord)
            context.commit()
        }