Search code examples
mysqlloadtruncateapache-nifidata-ingestion

Nifi - Truncate and Load to mysql db table


I am reading CSV files from an SFTP site and loading it to mysql db using Nifi.

I have the below workflow which seems to be working fine. I just need some help in figuring out how to truncate the table before i start with the data load.

Nifi Flow:

ListSFTP -> FetchSFTP -> InferAvroSchema -> ConvertCSVtoAvro -> ConvertAvrotoJSON -> SplitJSON -> ConvertJSONtoSQL -> PutSQL

This flow seems to work fine, but every time i run this, i need the table to be truncated first and then start the load.

Can someone please help me with some info on how i can achieve this. Or is there a better flow than what i have written, please advise.

Thanks, Aadil


Solution

  • Before I get to the "truncate before insert" part, I recommend replacing everything from ConvertCSVtoAvro to PutSQL with PutDatabaseRecord. It basically does a "ConvertXtoSQL->PutSQL" together, and alleviates the need for all those conversions just to execute SQL statements.

    The truncate before insert part is a little trickier. As of NiFi 1.5.0 (not yet released at the time of this writing) via NIFI-4522, you will be able to retain your flow file contents while executing a SQL command. This means that before PutDatabaseRecord you could have PutSQL with the "SQL Statement" set to "TRUNCATE myTable". PutSQL will issue the TRUNCATE but pass the flow file on.

    Offhand the only workaround I can think of at the moment is an ExecuteScript processor. If using PutDatabaseRecord you'd likely have to get the DBCPConnectionPool by name (see my blog post for an example) and execute the TRUNCATE statement yourself, then pass along the incoming flow file. If using your flow above with PutSQL, you could get the incoming flow file, then write out a "TRUNCATE myTable" flow file ONLY if the fragment.index is zero and you're using a Prioritizer on the connection, then transfer the incoming flow file.