Search code examples
apache-kafka-connectflink-sql

Using flink-sql, how to create a table that reads from multiple topics using wildcard/regex


I have 2 tables with exactly same structure created in flink-sql application .. lets say

CREATE TABLE t1_g1
(
    `id`    INT,
    `name`    STRING
)
    WITH ( 'connector' = 'kafka',
        'topic' = 't1.g1',
        'properties.auto.offset.reset' = 'earliest',
        'scan.startup.mode' = 'earliest-offset',
....       );

CREATE TABLE t1_g2
(
    `id`    INT,
    `name`    STRING
)
    WITH ( 'connector' = 'kafka',
        'topic' = 't1.g2',
        'properties.auto.offset.reset' = 'earliest',
        'scan.startup.mode' = 'earliest-offset',
....       );

Question: Is it possible or supported in flink-sql, if i can create another table that reads from both t1.g1 and t1.g2 topics using some wildcard like..

CREATE TABLE t_all
(
    `id`    INT,
    `name`    STRING
)
    WITH ( 'connector' = 'kafka',
        'topic' = 't1.*',
        'properties.auto.offset.reset' = 'earliest',
        'scan.startup.mode' = 'earliest-offset',
....       );

Solution

  • Yes, this is possible. You could, for example, set

    'topic' = 't1.g1;t1.g2'
    

    or

    'topic-pattern' = 't1.*'
    

    Note that you'll need to adjust scan.topic-partition-discovery.interval if you want the consumer to discover topics created after the job starts running.

    The details are described in the documentation.