There are elements being reprocessed in the Flink transformed (joined) stream even when they were not modified.
Let's say that we have 3 elements: 1, 2 and 3. When they are inserted, this happens:
In this last insertion, nothing has changed for 1 or 2, so there is no reason for them to be reprocessed.
Reprocessing rules:
A global window is being used after a join, as shown below:
bookStream
.join(publisherStream)
.where(book -> book.publisherId)
.equalTo(publisher -> publisher.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
@Override
public Book_Publisher join(Book book, Publisher publisher) throws Exception {
return new Book_Publisher(book, publisher);
}
})
ForeverTrigger implementation:
public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {
@Override
public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(E window, TriggerContext ctx) throws Exception {}
}
For this use case, it is necessary to store all elements at all times, because if a book is updated, we need to have the corresponding publisher there to join, and vice-versa. So removing elements from bookStream
or publisherStream
is not an option.
A solution would be to use the TableAPI, as referred here: Why does Flink emit duplicate records on a DataStream join + Global window?. This would work and could then be converted to a datastream. However, I would like to avoid mixing the table API usage with the datastream API usage, specially because the main project goal is generalizing and automatizing the creation of flink pipelines, which means that there would be two APIs to generalize instead of one. So if there is a different efficient solution, that would be great.
Another solution would be evicting or filtering elements, as mentioned in the same post linked above, but this seems inefficient since it would require processing the elements still, in order to evict/filter them. This would required keeping a list of previous states and comparing incoming elements.
Ideally, Flink would know to process only the elements that contain changes. Is there an efficient solution to perform this join with datastreams and only process the modified elements?
Windowed joins weren't designed with this sort of scenario in mind. To handle this efficiently I think you'll need to either go down a level in the API stack and use KeyedCoProcessFunctions, or go up a level and use the Table API.