Search code examples
apache-nifidata-extraction

Database Extract with NiFi


I want to extract all the records from my table 'nifitest' in my SQL database called customers. I am attaching the template for your help guys. I am facing the following problems: Once I run the 'QueryDatabaseTable' processor i get the flowfiles in the queue but when i empty the queue and try to re-run the 'QueryDatabaseTable' processor I cannot fetch the records in the queue. How can I fix this problem?

The whole idea of the flow is to extract records from mySql database and store the file locally on my Desktop.

I have used the template on following website to achieve this and changed my tablename and Database connection pooling service as per my requirement:-

https://www.batchiq.com/database-extract-with-nifi.html

Also, After the mergecontent I am getting the file as file Type. Which processor to use in order to store the merged file as CSV file Type (All records to be stored as Test.csv instead of file Type) I was using 'UpdateAttribute' processor and adding property 'filename' and value as 'Test.csv' but in the output file I only see one record not all the records. How to fix this?

Thanks!


Solution

  • QueryDatabaseTable will store the state when it runs and pulls only the incremental records that have been added to the table when it runs again.

    To get all records from the table you need to clear the state of the processor then only you will able to get all the records from the table.

    How to clear state of QueryDatabaseTable processor?

     1. Stop Query Database processor //make sure no threads are running at top right corner
    
     2. Right Click on the processor
    
     3. Go to View state tab
    
     4. Click on clear state //this will clear all the stored state in the processor
    

    enter image description here Refer to this link for more details regards to clearing the state of QueryDatabase table processor

    -> Another issue is with MergeContent processor as the

    Minimum Number of Entries are set to 100 so the processor will wait until you are 100 flowfiles in the queue before MergeContent processor.

    Use Max Bin Age property value to 1 min ..etc so that processor will forcefully merges the flowfiles and transfer the merged flowfile into Merged relationship.

    Refer to this link for more details regards to MergeContent processor usage/configuration.

    UPDATE:

    1.

    If you want to convert the final file format to csv then save to Local path.

    Then you can use ConvertRecord processor after QueryDatabaseTable there is no need of converting the Avro --> Json --> Csv

    Configure ConvertRecord processor as Avro Reader and CsvRecordSetWriter controller services then processor reads Avro data then converts to Csv format.

    To change the filename of the flowfile use UpdateAttribute processor add the desired filename as new property to the processor.

    Flow:

      - QueryDatabaseTable //get data from source in avro format 
      - ConvertRecord //convert Avro format to CSV format
      - UpdateAttribute //change filename
      - PutFile //store the csv file into local
    

    2.

    If you are concern about the data is stored as one line in the output flowfile then configure merge content processor as

    Delimiter Strategy Text
    
    Demarcator shift+enter
    

    Refer to this link for more details regards to this merge content delimiter strategy configs.

    using this configs we are not changing the format of the data(data will be still in json) but we are adding new line after each json record.