Search code examples
postgresqlapache-flinkflink-streamingflink-sql

Flink - Run a continuous query on JDBC database using JdbcDynamicTableSource


I am using Flink to read from a postgresql database, which is constantly being updated with new data. Currently, I am able to make one-time queries from this database using Flink's JdbcCatalog.

I would like to run a continuous query over this database, but because the sql source is not an unbounded input, my query runs once and stops. From what I gather, I could either:

  1. Run these queries repeatedly in a brute-force way, perhaps using Iterations.
  2. Use JdbcDynamicTableSource, since continuous queries are meant to be done over dynamic tables.

The second solution would be ideal. However, I have no idea how to use run a continuous query over a JdbcDynamicTableSource. I am able to instantiate a JdbcDynamicTableSource with the required connection options, but how should I run a continuous query over it to produce a dynamic table? Essentially, how do I use JdbcDynamicTableSource?

Any advice/code samples would be very much appreciated!


Solution

  • I see 4 ways your problem could get solved:

      1. Use Changelog Data Capture (CDC) with something like Debezium. CDC will look at your postgres' WAL an produce a stream of changes. Some Flink connectors are already available to interpret it, and build a Table from it. This should be your prefered way, but it requires some admin rights to your postgres' instance I believe.
      1. Use postgres's LISTEN/NOTIFY, pipe it to a message queue, interpret it in Flink with some Deduplication. This techniques seems complicated and brittle, though.
      1. Use Kafka Connect's JDBC Connector, configured for polling your table with incrementing.column.name set to an incremented Primary Key, or a last change timestamp that you update with a trigger. You need Kafka Streams though. Not real-time, but you can reduce the poll interval to every second (be sure to have an index on the polled column).

    The iceberg here being what happens when there is a failure. I think 1) and 3) should be OK. Also, there are some performance concerns: 1) will slow down your writes to postgres (from the replication I/O overhead), 3) will probably slow down your reads to postgres (from the constant polling)

    Also all solutions involve Kafka or a message queue. You could also try 4):

    • Implement 3) and poll your database yourself in a Flink SourceFunction. Be sure to use a Stateful Source Function and have your querying offset as a ValueState, so that in case of failure it can restart at the right offset. Some ideas to deal with duplicate querying: set that source's parallelism to 1, or poll on keys modulo your parallelism.

    EDIT: 5. Like Debezium CDC in 1., but packaged up as source in Flink. This might be the best option.