Search code examples
javacassandraapache-flinkapache-nifi

Apache NiFi : How to fetch data from Cassandra table whenever table record is getting modified


I have Cassandra as the database, We are using "QueryCassandra" processor to fetch values from Cassandra table to an output port, Which uses a select query to fetch the records. I have a use case mentioned below.

1) First time all the records need to be fetched from Cassandra and transferred to the output port, that's happening now. (i.e All data is frequently fetched from the table at particular time interval as we mention in Run Schedule)

2) Later whenever the Cassandra table is modified (Insert New Record or Row Updated or Row delete) then only the records need to be sent to the output port, is there any way we can achieve this instead of fetching every time intervals?

Sample Nifi Template


Solution

  • This isn't currently possible with NiFi (1.11.4 at the time of this writing), we'd need either a Cassandra version of QueryDatabaseTable (where you provide a column that only increases, like timestamp) or a CaptureChangeCassandra processor where we use a CommitLogReader to read the commit log rather than querying the table itself.

    Please feel free to write a New Feature Jira case to add CDC capabilities for Cassandra.