I am using Flink to read from a postgresql database, which is constantly being updated with new data. Currently, I am able to make one-time queries from this database using Flink's JdbcCatalog.
I would like to run a continuous query over this database, but because the sql source is not an unbounded input, my query runs once and stops. From what I gather, I could either:
The second solution would be ideal. However, I have no idea how to use run a continuous query over a JdbcDynamicTableSource. I am able to instantiate a JdbcDynamicTableSource with the required connection options, but how should I run a continuous query over it to produce a dynamic table? Essentially, how do I use JdbcDynamicTableSource?
Any advice/code samples would be very much appreciated!
I see 4 ways your problem could get solved:
incrementing.column.name
set to an incremented Primary Key, or a last change timestamp that you update with a trigger. You need Kafka Streams though. Not real-time, but you can reduce the poll interval to every second (be sure to have an index on the polled column).The iceberg here being what happens when there is a failure. I think 1) and 3) should be OK. Also, there are some performance concerns: 1) will slow down your writes to postgres (from the replication I/O overhead), 3) will probably slow down your reads to postgres (from the constant polling)
Also all solutions involve Kafka or a message queue. You could also try 4):
EDIT: 5. Like Debezium CDC in 1., but packaged up as source in Flink. This might be the best option.