Search code examples
apache-flinkflink-streamingflink-sql

How to organize the flow of data in Flink when tables can't be partitioned using the same identifier


I'm convinced that Flink is the perfect solution to my event processing problem. I have even managed to produce a working prototype, but I'm not convinced it is even close to optimal.

Here is my scenario:

  • I have two kinesis streams
    • One stream contains Event1 and is stored as JSON
    • The other stream contains Event2, Event3, and Event4 but is stored as Gzip'd Base64 (which ultimately is also JSON). I have to process this using the RAW format and then extract the event data using a custom UDF process_events234 - created by implementing TableFunction[Row] in a Scala class.
  • I want to detect when 4 corresponding events have arrived, but there is no single value I can use to join all 4 data types the events represent. See below:
Data Type Has key1 Has key2
Event1 Yes No
Event2 Yes Yes
Event3 No Yes
Event4 No Yes

My prototype notebook has the following:

Define a table for event_1s

CREATE TABLE event_1 (
  key1,
  ...
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'stream_of_event_1s',
    ...
    'format' = 'json'
)

Define a table for event_1,2,3s

CREATE TABLE events_234 (
  Data BYTES
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'stream_of_event_1_2_3s',
    ...
    'format' = 'raw'
)

Create a view to separate each event 2,3,4

CREATE VIEW event_N // Where N is 2,3,4
AS
SELECT 
      p.*
FROM
      events_234 e
      JOIN LATERAL table(process_events234(e.Data)) as p ON TRUE
WHERE
      p.eventType = 'eventN' // Where N is 2,3,4

Join the data together to get my results

/*INSERT INTO my_downstream_sink */
SELECT
    e1.*, e2.*, e3.*, e4.*
FROM
    event_1 e1
    INNER JOIN event_2 e2 ON e1.key1 = e2.key1
    INNER JOIN event_3 e3 ON e2.key2 = e3.key2
    INNER JOIN event_4 e4 ON e2.key2 = e4.key2

My current prototype is working for a few hundred records over a 10 minutes period, but I doubt it's ability to scale. What concerns me is the fact that I am not able to partition/keyBy the data such that I imagine it would exist on the same worker. I'm new to Flink, but this seem particularly important.

What occurs to me is to expand the number of steps and kinesis streams such that:

  • I join Event1 and Event2 then insert that onto a new stream Event1+Event2 partitioned by key2
  • Then join Event1+Event2 with Event3, Event4

However, I'm just guessing and would appreciate some expert advice and opinions. Thanks!


Solution

  • I wouldn't worry; Flink's SQL planner/optimizer should handle this just fine.

    You may find it useful to use EXPLAIN and/or look at the resulting job graph in the Flink web dashboard to get a clearer idea of how the query is being executed. I believe you'll find that it's doing exactly what you propose (creating an Event1+Event2 stream, keying it by key2, and then joining with the other streams) without the expense of writing the Event1+Event2 stream out to Kinesis and reading it in again.