Search code examples
apache-flink

A query on data exchange in Apache Flink


Going through a book on Apache Flink, and it has following:

If the sender task and receiver task run in the same TaskManager process, the sender task serializes the outgoing records into a byte buffer and puts the buffer into a queue once it is filled. The receiving task takes the buffer from the queue and deserializes the incoming records. Hence, no network communication is involved.

Why do we need serialization if both sender and receiver are in same JVM process?


Solution

  • There are a couple of reasons, but let's step back a bit and talk about operator chains.

    An operator chain can be used if two successive operators have the same degree of parallelism, have only one input/output, share a task slot, and also expose certain properties (e.g., they say they can be chained). The easiest chain is just a series of maps. But it could also be source -> map -> sink that can be completely chained. Records within a chain are not serialized but copied (unless enableObjectReuse has been set). I guess that is what you had in mind when you were surprised about the serialization.

    An operator chain is for example broken if you have more than one input or an explicit network shuffle. Both is true for joins. So the data is serialized. However, you could still have two successive sub-tasks on the same TM. That's where the local input channels come into play that correspond to what you have found. The serialization at local input channels makes handling of backpressure and checkpointing much easier than if at a given step some records are serialized and others are not.