Search code examples
mysqlfetchapache-nifiprocessor

Duplicate records ingesting in incremental fetch using NIFI,


I am designing a workflow to get an incremental fetch using NIFI the source and target databases are in MySQL. the processors are QueryDatabaseTable and spiltAvro and convertAvroToJson and convertJsonToSQL and PutSQL as below image

enter image description here

and the configuration parameters for querydatabasetable is as follows

enter image description here

now the source database table has only 200 records, when I start the process the flow is ingesting the same records repeatedly.

I have set the maximum-value columns to createTime which is timestamp and the value for it is

2017-12-07 18:48:23

for all the 200 records.

I also replaced maximum-value columns with ID but that also resulted in same issue. What could be causing this repeated ingestion?


Solution

  • Based on the suggestion in the covertJSONtoSQL returning empty values in NiFi by @mattyb I replaced spiltAvro and convertAvroToJson and convertJsonToSQL and PutSQL with PutDatabaseRecord processor, and it worked with Incremental ingestion.

    The flow will look like this with maximum-value columns property has been set to ID,time_stamp

    enter image description here