Search code examples
java-8apache-flinkflink-streamingflink-cep

Cant initialize Field on Custom DataSink (Flink CEP)


I have a problem with the Apache Flink Streaming API.

I could manage to set up the whole CEP-Environment with a custom DataSource and when using a standard sink on that source like "print()", everything works fine.

This is what my sink looks like now:

@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{

private final transient Consumer<EventData> consumer;

    @Override
    public void invoke(EventData eventData) throws Exception {
        consumer.accept(eventData);
    }
}

What I try to achieve ist, to pass a method reference to this SinkFunction, which shall be executed for each element in my DataStream.

This is how I initialize the SinkFunction:

EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);

My Problem is, that when I set a breakpoint in the "invoke" method of my custom sink, the consumer appears to be null even though I call the constructor explicitly, which assigns the consumer.


Solution

  • As the Sink is distributed to as many instances as the parallelism of the sink it should be serializable. When executing on cluster the Sink is serialised sent to TaskManagers where it is deserialised.

    In your example the consumer field is transient, that is why after serialisation it becomes null.