Search code examples
apache-flinkflink-streaming

Flink Collector.collect(T) is consuming more than 150 sec


I am extending KeyedProcessFunction of Apache Flink to define workflows. My workflow consists of around 10-15 processors. All other processors collector.collect(T) finishes within 1 sec. While one particular ProcessFuntion takes more than 150 sec in the worst case. This process function emits the payload of the same type as other processFunction. the size of the payload is also very similar to other processors. I also rely on KeyBy() after each keyedProcessFunction. KeyBy() has the same definition for all processfunction and relies on the same attribute throughout the workFlow.

How do I debug/resolve the issue of collector.collect taking so much time?

I am using Flink 1.8.0.

public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {

    @Override
    public void processElement(Foo foo, Context ctx, Collector<Foo> out) {

        try {
            if(Contant.Foo.equals(foo.y)) {
                collect(foo, out);
                return;
            }
            work(foo);
            collectEventTimePayload(foo, out);
        } catch (Exception e) {
            log.error("error occurred while processing {} with exception", x, e);
        }
    }

    @Timed(aspect = "ProcessFunctionWork")
    private void work(Foo foo) {
        //some business logic. In worst casem time taken is 400 ms.
    }

    @Timed(aspect = "AProcessFunctionCollector")
    private void collect(Foo foo, Collector<Foo> out) {
        out.collect(foo);
    }


    @Timed(aspect = "AProcessFunctionEventTimeCollector")
    private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
        if(CollectionUtils.isNotEmpty(foo.ids())){
            collect(foo, out);
        }
    }
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
    private final ProviderWorker providerWorker;
    @Override
    public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
        try {
            if(!handleResourceIdExceptions(foo, out)) {
                Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
                    .work(foo);
                if (workedFoo.isPresent()) {
                    collectorCollect(workedFoo.get(), out);
                    return;
                }
            }
            collectorCollect(foo, out);
        } catch (Exception e) {
            log.error("error occurred while processing {} with exception", foo, e);
        }
    }



    @Timed(aspect = "BProcessFunctionCollector")
    private void collectorCollect(Foo foo, Collector<Foo> out) {
        out.collect(foo);
    }
}

AProcessFunction.collect() takes 150 sec in worst case. While BProcessFunction takes < 30ms. My workflow is

dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);

What does exactly collector.collect method does? Does it include time till messages are written to buffers, or it includes the time till the next task's input buffer's are filled?


Solution

  • Collector.collect writes the data in a blocking fashion into buffers that are sent asynchronously over the network to the respective task. So the time needed depends on the serialization time + the waiting time for a free buffer if all buffers are used up. A buffer only becomes available once it has been sent over network to the downstream task. If that task is bottlenecked, it means that the buffer cannot be sent immediately and is backpressured.

    In your case, I'd suspect that you indeed have backpressure (easy to see in Web UI) and the buffers are take a long time until they available. There are two common cases of backpressure:

    • From sink: if it takes longer to write the data than to produce the data, eventually your whole DAG is backpressured and the whole processing speed becomes the writing speed of the sink. The solution might be to use a different sink or beef up the target system.
    • From data skew: One of the keyby might use a key where a few values are dominant. That means that your whole parallelism effectively goes down to these few values. Each key has to be processed by only one subtask (consistency guarantees). Then this subtask is overloaded while the other subtasks of that particular process function are idling more or less. The solution here is to use a different key or use some aggregation that supports pre-aggregations.

    In both cases, the starting point is to narrow down the issue with Web UI. Happy to help with more information.

    Note: from your sources, I don't see the need to keyBy at all. Without keyBy, you probably achieve better parallelism and it should be much faster.