I am extending KeyedProcessFunction
of Apache Flink to define workflows. My workflow consists of around 10-15 processors. All other processors collector.collect(T) finishes within 1 sec. While one particular ProcessFuntion takes more than 150 sec in the worst case. This process function emits the payload of the same type as other processFunction. the size of the payload is also very similar to other processors. I also rely on KeyBy() after each keyedProcessFunction. KeyBy() has the same definition for all processfunction and relies on the same attribute throughout the workFlow.
How do I debug/resolve the issue of collector.collect taking so much time?
I am using Flink 1.8.0.
public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(Contant.Foo.equals(foo.y)) {
collect(foo, out);
return;
}
work(foo);
collectEventTimePayload(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", x, e);
}
}
@Timed(aspect = "ProcessFunctionWork")
private void work(Foo foo) {
//some business logic. In worst casem time taken is 400 ms.
}
@Timed(aspect = "AProcessFunctionCollector")
private void collect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
@Timed(aspect = "AProcessFunctionEventTimeCollector")
private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
if(CollectionUtils.isNotEmpty(foo.ids())){
collect(foo, out);
}
}
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
private final ProviderWorker providerWorker;
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(!handleResourceIdExceptions(foo, out)) {
Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
.work(foo);
if (workedFoo.isPresent()) {
collectorCollect(workedFoo.get(), out);
return;
}
}
collectorCollect(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", foo, e);
}
}
@Timed(aspect = "BProcessFunctionCollector")
private void collectorCollect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
}
AProcessFunction.collect() takes 150 sec in worst case. While BProcessFunction takes < 30ms. My workflow is
dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);
What does exactly collector.collect method does? Does it include time till messages are written to buffers, or it includes the time till the next task's input buffer's are filled?
Collector.collect
writes the data in a blocking fashion into buffers that are sent asynchronously over the network to the respective task. So the time needed depends on the serialization time + the waiting time for a free buffer if all buffers are used up. A buffer only becomes available once it has been sent over network to the downstream task. If that task is bottlenecked, it means that the buffer cannot be sent immediately and is backpressured.
In your case, I'd suspect that you indeed have backpressure (easy to see in Web UI) and the buffers are take a long time until they available. There are two common cases of backpressure:
In both cases, the starting point is to narrow down the issue with Web UI. Happy to help with more information.
Note: from your sources, I don't see the need to keyBy
at all. Without keyBy
, you probably achieve better parallelism and it should be much faster.