Search code examples
apache-flinkflink-streamingflink-sqlflink-cep

Flink multi source with kafka, kinesis and TableEnvironment


I'm new to Flink and hope someone can help. I have tried to follow Flink tutorials.

We have a requirement where we consume from:

  1. kafka topic.

When an event arrives on kafka topic we need the json event fields (mobile_acc_id, member_id, mobile_number, valid_from, valid_to) to be stored in an external db (Postgres db)

  1. kinesis stream.

When an event arrives on kinesis stream we need to look up the mobile_number, on the event, in Postgres DB (from step 1) and extract the "member_id" from db and enrich the incoming kinesis event and sink it to another output stream

So I set up a Stream and a Table environment like this:

public static StreamExecutionEnvironment initEnv() {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setAutoWatermarkInterval(0L); //disables watermark
    return env;
}

public static TableEnvironment initTableEnv() {
    var settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    return TableEnvironment.create(settings);
}

calling process(..) methods with initEnv() will use kinesis as the source!

process(config.getTransformerConfig(), input, sink, deadLetterSink, initEnv());

In the process(..) am also initialising the Table Environment using initTableEnv() hoping that Flink with consume from both sources when I call env.execute(..):

public static void process(TransformerConfig cfg, SourceFunction<String> source, SinkFunction<UsageSummaryWithHeader> sink,
                           SinkFunction<DeadLetterEvent> deadLetterSink, StreamExecutionEnvironment env) throws Exception {
    var events =
            StreamUtils.source(source, env, "kinesis-events", cfg.getInputParallelism());

    collectInSink(transform(cfg, events, deadLetterSink), sink, "kinesis-summary-events", cfg.getOutputParallelism());

    processStreamIntoTable(initTableEnv());

    env.execute("my-flink-event-enricher-svc");
}

private static void processStreamIntoTable(TableEnvironment tableEnv) throws Exception {
    tableEnv.executeSql("CREATE TABLE mobile_accounts (\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = 'mobile_accounts',\n" +
            "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
            "    'format'    = 'json'\n" +
            ")");

    tableEnv.executeSql("CREATE TABLE mobile_account\n" +
            "(\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "   'connector'  = 'jdbc',\n" +
            "   'url'        = 'jdbc:postgresql://flinkpg:5432/flink-demo',\n" +
            "   'table-name' = 'mobile_account',\n" +
            "   'driver'     = 'org.postgresql.Driver',\n" +
            "   'username'   = 'flink-demo',\n" +
            "   'password'   = 'flink-demo'\n" +
            ")");

    Table mobileAccounts = tableEnv.from("mobile_accounts");

    report(mobileAccounts).executeInsert("mobile_account");
}

public static Table report(Table mobileAccounts) {
    return mobileAccounts.select(
            $("mobile_acc_id"),
            $("member_id"),
            $("mobile_number"),
            $("valid_from"),
            $("valid_to"));
}

What I have noticed on the flink console is that it is only consuming from one Source!

I liked TableEnvironment as not much code is needed to get the items inserted into the DB.

How can we consume from both the sources, Kinesis and TableEnvironment in Flink?

Am I using the right approach?

Is there an alternative to implement my requirements?


Solution

  • I assume you are able to create the tables correct, then you can simply JOIN two streams named kafka_stream and kinesis_stream as

    SELECT l.*, r.something_useful FROM kinesis_stream as l
    INNER JOIN kafka_stream as r
    ON l.member_id = r.member_id;
    

    If PostgreSQL sink is essential, you can make it in a different query as

    INSERT INTO postgre_sink
    SELECT * FROM kafka_stream;
    

    They will solve your problem with Table API (or Flink SQL).