Search code examples
scalaapache-kafka-streamsstatefulkafka-streams-scala

Random fail on KafkaStreams stateful application


Hi here is a problem I stumble upon since a few days and can't find the answer by myself.

I am using the scala streams API v2.0.0.

I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.

To do a quick overview, it looks like

def buildStream(builder: StreamsBuilder, config: Config) = {
    val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
    builder.addStateStore(store)

    val handlers = List(handler1, handler2)

    builder
      .stream(config.topic)
      .branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
      .zip(handlers.toList)                       // (KStream[K, V], Handler)
      .map((h, stream) => h.handle(stream))       // process the event on the correct handler
      .reduce((s1, s2) => s1.merge(s2))           // merge them back as they return the same object
      .to(config.output)

    builder
}

Each of my handlers look the same: Take an event, do some operations, pass through the transform() method to derive a state and emit an aggregate:

class Handler1(config: Config) {
    def accepts(key: String, value: Event): Boolean = ???  // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
        stream
          .(join/map/filter)
          .transform(new Transformer1(config.storeName))
    }
}


class Handler2(config: Config) {
    def accepts(key: String, value: Event): Boolean = ???  // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
        stream
          .(join/map/filter)
          .transform(new Transformer2(config.storeName))
    }
}

The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .

class Transformer1(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
        store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
        val existing: Option[AggregatedState] = Option(store.get(key))
        val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

        store.put(key, agg)
        if(agg.isTerminal){
          store.delete(key)
        }
        if(isDuplicate(existing, agg)){
            null                              // Tombstone, we have a duplicate
        } else{
            (key, agg)                        // Emit the new aggregate
        }
    }

    override def close() = Unit
}


class Transformer2(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
        store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
        val existing: Option[AggregatedState] = Option(store.get(key))
        val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

        store.put(key, agg)
        if(agg.isTerminal){
          store.delete(key)
        }
        if(isDuplicate(existing, agg)){
            null                              // Tombstone, we have a duplicate
        } else{
            (key, agg)                        // Emit the new aggregate
        }
    }

    override def close() = Unit
}

Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)

The problem I have is that on stream startup, I can either have a normal startup or a boot exception :

15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)

I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood). As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding. Is it because I use the same state-store on different transformers?


Solution

  • I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250

    It's fixed in version 2.0.1 and 2.1.0.

    If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.

    .transform(() => new Transformer1(config.storeName))