I have a resource with frequently added/updated rows called Orders, and a resource called UserProfile with less frequent (but often important) updates. I want to perform a continuous join query on these two resources (currently stored logically as dynamic tables in Flink) such that when a UserProfile update occurs, the join table emits a DataStream with one updated EnrichedOrder per Order associated with that UserProfile. The objective is to index this enriched denormalized data for efficient querying in a downstream data sink.
My question is, what is the best way to architect this join? Specifically, how do I ensure that all Order records on the "many" side of the join are enriched when a UserProfile is updated, while still keeping things performant?
Since an arbitrary inner join with no time window is not possible in Flink (presumably for performance reasons), I imagine setting an excessively large time window is discouraged as well.
What are the recommended approach(es) for this use case?
In Flink 1.4.0, SQL does not support inner joins on streams without time boundaries and the DataStream API does not provide a nice syntax. However, it is possible to hand-craft such a join using a CoProcessFunction
.
Flink 1.5.0 will support unbounded stream joins in SQL. By default, both input tables will be held completely in state. However, it is possible to configure a state retention time after which the state for inactive keys (inactive = not updated within the configured interval) is cleared.
Since the SQL support is implemented on top of the DataStream API, you could check the implementation of the unbounded SQL join.