Given that flatTransform is deprecated, I'm trying to replace it by process as suggested.
My previous TransformerSupplier looked like this:
public class MyTransformerSupplier implements TransformerSupplier<A, B, Iterable<C, D>> {
@Override
public Transformer<A, B, Iterable<C, D>> get() {
return new MyTransformer();
}
@Override
public Set<StoreBuilder<?>> stores() {
// Declare a state store
return Set.of(someStore);
}
}
My actual ProcessorSupplier looks like this:
public class MyProcessorSupplier implements ProcessorSupplier<A, B, C, D> {
@Override
public Processor<A, B, C, D> get() {
return new MyProcessor();
}
@Override
public Set<StoreBuilder<?>> stores() {
// Unmodified declaration of the state store
return Set.of(someStore);
}
}
And the only significant difference between my previous Transformer and my new ContextualProcessor is that instead of returning a List, several calls to context().forward(...)
are executed internally.
The error is precisely that context()
is null for my new ContextualProcessor.
Could you help me understand why?
You should override method init()
, there you can initialize a variable context that will be supplied automatically by Kafka streams.
In Scala, you can do something like this
private var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
You then use that variable to forward records.