Search code examples
apache-flinkflink-streamingstream-processingflink-sql

Apache Flink: Best way to architect many-to-one join on dynamic table?


I have a resource with frequently added/updated rows called Orders, and a resource called UserProfile with less frequent (but often important) updates. I want to perform a continuous join query on these two resources (currently stored logically as dynamic tables in Flink) such that when a UserProfile update occurs, the join table emits a DataStream with one updated EnrichedOrder per Order associated with that UserProfile. The objective is to index this enriched denormalized data for efficient querying in a downstream data sink.

My question is, what is the best way to architect this join? Specifically, how do I ensure that all Order records on the "many" side of the join are enriched when a UserProfile is updated, while still keeping things performant?

Since an arbitrary inner join with no time window is not possible in Flink (presumably for performance reasons), I imagine setting an excessively large time window is discouraged as well.

What are the recommended approach(es) for this use case?


Solution

  • In Flink 1.4.0, SQL does not support inner joins on streams without time boundaries and the DataStream API does not provide a nice syntax. However, it is possible to hand-craft such a join using a CoProcessFunction.

    Flink 1.5.0 will support unbounded stream joins in SQL. By default, both input tables will be held completely in state. However, it is possible to configure a state retention time after which the state for inactive keys (inactive = not updated within the configured interval) is cleared.

    Since the SQL support is implemented on top of the DataStream API, you could check the implementation of the unbounded SQL join.