Search code examples
apache-flinkflink-sql

Share dynamic tables between Flink programs


I have a Flink job that creates a Dynamic table from a database changelog stream. The table definition looks as follows:

tableEnv.sqlUpdate("""
      CREATE TABLE some_table_name (
          id INTEGER,
          name STRING,
          created_at BIGINT,
          updated_at BIGINT
      )
      WITH (
          'connector' = 'kafka',
          'topic' = 'topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.zookeeper.connect' = 'localhost:2181',
          'properties.group.id' = 'group_1',
          'format' = 'debezium-json',
          'debezium-json.schema-include' = 'true'
      )
    """)

When trying to reference that table in another running Flink application on the same cluster, my program returns an error: SqlValidatorException: Object 'some_table_name' not found. Is it possible to register that table somehow such that other programs can use it? For example in a statement like this:

  tableEnv.sqlQuery("""
    SELECT count(*) FROM some_table_name
  """).execute().print()

Solution

  • Note that a table in Flink doesn't hold any data. Another Flink application can independently create another table backed by the same Kafka topic, for example . So not sharing tables between applications isn't as tragic as you might expect.

    But you can share tables by storing them in an external catalog. E.g., you could use an Apache Hive catalog for this purpose. See the docs for more info.