Search code examples
apache-flinkflink-streamingsnapshotfault-tolerance

May snapshot mechanism spend more and more memory in Apache Flink


I'm learning how snapshot mechanism works in Flink.

As my understanding, JobManager will insert the barriers into each Data Source with a fixed interval, and each operator will do a snapshot once it receive nth barriers from all of its data sources.

If I'm right, it seems that this mechanism may use more and more memories in some cases.

Here is an example:

Saying that there are two Data Sources: Source 1 and Source 2, and one Operator.

Source 1 -----\
               ------ Operator
Source 2 -----/

Source 1 is generating the integer stream: 1, 2, 3, 4, 5...

Source 2 is generating the character stream: a, b, c, d, e...

The Operator does this: it takes two inputs from Source 1 and one input from Source 2 to generate an output: 1a2, 3b4, 5c6, 7d8...

Let's say JobManager inserts the barriers to the two Data Sources as below:

1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...

Now let's begin.

When the two "BARRIER A" of Source 1 and of Source 2 enter into the Operator, Flink will make a snapshot for the Operator, whose current state is 1 and a, because 1 and a have been in the Operator when the BARRIER A entered into the Operator.

Then, when the two "BARRIER B" enter into the Operator, the Operator has finished its first task: generate 1a2, and Flink will make another snapshot: NA, b. NA means currently there is no new input from Source 1.

At the same time, each snapshot will be stored into RAM, FS or RocksDB (depending on how we configures Flink).

If I'm right, I think Flink will generate more and more snapshots in this example. Because the speed of consumption of Source 1 is always two times than that of Source 2.

Did I misunderstand something?


Solution

  • Interesting thought experiment.

    If you limit yourself to using only the standard parts of the Flink API, there's no way to implement a user function that will read two inputs from Source 1 for every input read from Source 2. When implementing a CoProcessFunction, for example, you are at the mercy of the Flink runtime, which will supply events from either stream according to its own internal logic. The two streams will race against each other, possibly running in different threads or even in different processes. When the streams converge, if the events from the two inputs aren't provided in the order you would prefer, you have to buffer them in Flink state until you are ready to process them.

    A common case where this can lead to large demands for buffering is when implementing an event time join where one of the streams is well ahead of other in terms of their timestamps (e.g., joining financial transactions on foreign exchange rates, using the exchange rate in effect at the time of the transaction, if the exchange rate stream lags behind). But this buffering can be done in RocksDB, and doesn't have to put pressure on memory.

    Note that this state buffering is happening entirely in your application -- Flink does not have flexible network buffers that can balloon during backpressure.

    Another point is that snapshots are never stored in a local filesystem or in RocksDB. If you choose to use the RocksDB state backend, then the active, working state of each task manager will be stored in a local RocksDB instance, but the state backups (the snapshots) will be stored in a distributed file system.

    As for the the situation you describe like this,

    1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
    a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
    

    this isn't going to happen. Nothing is going to arrange for these two sources to be synchronized in this manner -- they will proceed much more independently than this diagram suggests. Because Flink has only a small, fixed amount of network buffering between the pipeline stages, any backpressure that occurs in the execution graph will quickly propagate back to one or both of the sources. When that occurs, the source that is backpressured will not be able to push any events into the pipeline until the backpressure eases up -- but meanwhile, the other source may be continuing to make progress. The barriers will be inserted into the two streams independently by the two sources at roughly the same time, but if source 2 is experiencing frequent backpressure (for example), it might look something more like this:

    1, BARRIER, A, 2, B, 3, BARRIER, C, 4, D, BARRIER, 5 ...
    a, BARRIER, A, BARRIER, b, B, BARRIER, BARRIER, c ...