Search code examples
javaapache-kafkaapache-kafka-streams

Deprecated KStreams TransformerSupplier to ProcessorSupplier


Given that flatTransform is deprecated, I'm trying to replace it by process as suggested.

My previous TransformerSupplier looked like this:

public class MyTransformerSupplier implements TransformerSupplier<A, B, Iterable<C, D>> {

  @Override
  public Transformer<A, B, Iterable<C, D>> get() {
    return new MyTransformer();
  }

  @Override
  public Set<StoreBuilder<?>> stores() {
    // Declare a state store
    return Set.of(someStore);
  }
}

My actual ProcessorSupplier looks like this:

public class MyProcessorSupplier implements ProcessorSupplier<A, B, C, D> {

  @Override
  public Processor<A, B, C, D> get() {
    return new MyProcessor();
  }

  @Override
  public Set<StoreBuilder<?>> stores() {
    // Unmodified declaration of the state store
    return Set.of(someStore);
  }
}

And the only significant difference between my previous Transformer and my new ContextualProcessor is that instead of returning a List, several calls to context().forward(...) are executed internally.

The error is precisely that context() is null for my new ContextualProcessor.
Could you help me understand why?


Solution

  • You should override method init(), there you can initialize a variable context that will be supplied automatically by Kafka streams.

    In Scala, you can do something like this

    private var context: ProcessorContext = _ 
    
    override def init(context: ProcessorContext): Unit = {
       this.context = context 
    }
    

    You then use that variable to forward records.