Search code examples
apache-flinkflink-streaming

How to split a window based on a second key in Apache Flink?


I am trying to create a data stream processing of a product scanner which generates events in the form of the following Tuple4: Timestamp(long, in milliseconds), ClientID(int), ProductID(int), Quantity(int).

At the end, a stream of Tuple3 should be obtained: ClientID(int), ProductID(int), Quantity(int) which represents a grouping of all the products with the same ProductID purchased by one client with a given ClientID. For any "transaction" there can be a maximum of a 10 seconds gap between product scans.

This is a short snippet of code that shows my initial attempt:

        DataStream<Tuple4<Long, Integer, Integer, Integer>> inStream = ...;

        WindowedStream<Tuple4<Long, Integer, Integer, Integer>, Integer, TimeWindow> windowedStream = inStream
            .keyBy((tuple) -> Tuple2.of(tuple.f1, tuple.f2))
            .window(EventTimeSessionWindows.withGap(Time.seconds(10)));
        
        windowedStream.aggregate(...); // Drop timestamp, sum quantity, keep the rest the same

However, this is where the issue comes in. Normally, a SessionWindow would be enough, but in this case it implements a gap of 10 seconds between 2 events with the key (ClientID, ProductID), which is not what is expected.

If we imagine the following tuples coming in:

  1. (10_000, 1, 1, 1) <6 second gap>
  2. (16_000, 1, 2, 1) <6 second gap>
  3. (22_000, 1, 1, 1) <6 second gap>
  4. (28_000, 1, 2, 1)

The sequence of tuples should be in the same SessionWindow, and 1 and 2 should be merged with 3, respectively 4, generating two output events. However, they are not in the same SessionWindow, because 1+3 and 2+4 are split in their separate streams by the keyBy and they are not aggregated since they do not fulfill the requirement of max 10 seconds between products.

I am wondering if there is a way to solve this with the application of a "second" key. First, the stream should be split based on the key ClientID, and then a SessionWindow should be applied (irrespective of the product). Following that, I was wondering if there is a way to subdivide the ClientID-keyed SessionWindows with the use of the second key (which would be ProductID) and effectively reach the same key as before (ClientID, ProductID) without the previous issue. Then, the aggregate could be applied normally to reach the expected output stream.

If that is not possible, is there any other way of solving this?


Solution

  • The simplest way to solve it would be to just do partitioning base on theClientID to capture all scans done by the particular client and then use process that would give You access to all elements in the paricular window, where You can generate separate events or outputs for every ProductID. Is there any reason why that might not work in Your setup ??