Search code examples
javaapache-flinkflink-streaming

Unchanged elements reprocessed in flink global window, with a join transformation


There are elements being reprocessed in the Flink transformed (joined) stream even when they were not modified.

Let's say that we have 3 elements: 1, 2 and 3. When they are inserted, this happens:

  • When the first element, 1, is inserted, the output is: 1
  • When the second element, 2, is inserted, the output is: 1 -> 2 (1 was reprocessed and outputted)
  • Third element insertion: 1 -> 2 -> 3 (1 and 2 are reprocessed)

In this last insertion, nothing has changed for 1 or 2, so there is no reason for them to be reprocessed.

Reprocessing rules:

  • Only books of the same publisher are reprocessed. Meaning that when books of publisher 2 are inserted, only the books of publisher 2 are reprocessed. Our goal is to not reprocess any, because they aren't affected by a new book existing.
  • When a publisher is modified, only the books of that publisher are reprocessed. (which is ok)

A global window is being used after a join, as shown below:

            bookStream
                .join(publisherStream)
                .where(book -> book.publisherId)
                .equalTo(publisher -> publisher.id)
                .window(GlobalWindows.create())
                .trigger(new ForeverTrigger<>())
                .apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
                    @Override
                    public Book_Publisher join(Book book, Publisher publisher) throws Exception {
                        return new Book_Publisher(book, publisher);
                    }
                })

ForeverTrigger implementation:

public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {

    @Override
    public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(E window, TriggerContext ctx) throws Exception {}
}

For this use case, it is necessary to store all elements at all times, because if a book is updated, we need to have the corresponding publisher there to join, and vice-versa. So removing elements from bookStream or publisherStream is not an option.

A solution would be to use the TableAPI, as referred here: Why does Flink emit duplicate records on a DataStream join + Global window?. This would work and could then be converted to a datastream. However, I would like to avoid mixing the table API usage with the datastream API usage, specially because the main project goal is generalizing and automatizing the creation of flink pipelines, which means that there would be two APIs to generalize instead of one. So if there is a different efficient solution, that would be great.

Another solution would be evicting or filtering elements, as mentioned in the same post linked above, but this seems inefficient since it would require processing the elements still, in order to evict/filter them. This would required keeping a list of previous states and comparing incoming elements.

Ideally, Flink would know to process only the elements that contain changes. Is there an efficient solution to perform this join with datastreams and only process the modified elements?


Solution

  • Windowed joins weren't designed with this sort of scenario in mind. To handle this efficiently I think you'll need to either go down a level in the API stack and use KeyedCoProcessFunctions, or go up a level and use the Table API.