Search code examples
javaapache-kafkaapache-flinkflink-streaming

Union of more than two streams in apache flink


I have an architecture question regarding the union of more than two streams in Apache Flink.

We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream. Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream. Our goal is to enrich main stream with code books.

There are three possible ways as I see it to do it:

  1. Make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state). This is now only way that I tired to do it. Deserilized Kafka topic messages which are in JSON to POJOs eg. Currency, OrganizationUnit and so on. I made one big wrapper class CodebookData with all code books eg:
public class CodebookData {
 private Currency currency;
 private OrganizationUnit organizationUnit
...
}

Next I mapped incoming stream of every kafka topic to this wrapper class and then made a union:

DataStream<CodebookData> enrichedStream = mappedCurrency.union(mappedOrgUnit).union(mappedCustomer);

When I print CodebookData it is populated like this

CodebookData{
Currency{populated with data},
OrganizationUnit=null,
Customer=null
}
CodebookData{
Curenncy=null,
OrganizationUnit={populated with data},
Customer=null
}
...

Here I stopped because I have problem how to connect this Codebook stream with main stream and save codebook data in value state. I do not have unique foreign key in my Codebook data because every codebook has its own foregin key that connects with main stream, eg. Currency has currencyId, organizationUnit orgID and so on. Eg.I want to do something like this

SingleOutputStreamOperator<CanonicalMessage> enrichedMainStream = mainStream
            .connect(enrichedStream)
            .keyBy(?????)
            .process(new MyKeyedCoProcessFunction());

and in MyCoProcessFunction I would create ValueState of type CodebookData.

Is this totally wrong or can I do something with this and if it is douable what I am doing wrong?

  1. Second approach is by cascading a series of two-input CoProcessFunction operators with every kafka event source but I read somewhere that this is not optimal approach.

  2. Third approach is broadcast state that is not so much familiar to me. For now I see the problem if I am using RocksDb for checkpointing and savepointing I am not sure that I can then use broadcast state.

Should I use some other approach from approach no.1 whit whom I am currently struggling?


Solution

  • In many cases where you need to do several independent enrichment joins like this, a better pattern to follow is to use a fan-in / fan-out approach, and perform all of the joins in parallel.

    Something like this, where after making sure each event on the main stream has a unique ID, you create 3 or more copies of each event:

    enter image description here

    Then you can key each copy by whatever is appropriate -- the currency, the organization unit, and so on (or customer, IP address, and merchant in the example I took this figure from) -- then connect it to the appropriate cookbook stream, and compute each of the 2-way joins independently.

    Then union together these parallel join result streams, keyBy the random nonce you added to each of the original events, and glue the results together.

    Now in the case of three streams, this may be overly complex. In that case I might just do a series of three 2-way joins, one after another, using keyBy and connect each time. But at some point, as they get longer, pipelines built that way tend to run into performance / checkpointing problems.

    There's an example implementing this fan-in/fan-out pattern in https://gist.github.com/alpinegizmo/5d5f24397a6db7d8fabc1b12a15eeca6.