Search code examples
apache-nifi

How can I get the csv file line content and save it to the database using NiFi processors?


See example scenario:

csv file content

john|doe|1
stacy|doe|2

database fields

fname | lname | list_index | raw_text

My objective is to ingest and save CSV file content to database using nifi processor. See sample output inserted in the database below including the record inserted in raw_text column.

 fname | lname | list_index | raw_text
  john | doe   |     1      | "john|doe|1 " 
 stacy | doe   |     2      | "stacy|doe|2"

Solution

  • If you need to do a lot of data customization then you can use ExecuteScript processor to do data manipulation, pipeline should be something like -

    ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord

    Configure your ExecuteScript as below,

    Script Engine: python

    Script Body:

    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    from org.apache.nifi.processors.script import ExecuteScript
    from org.python.core.util.FileUtil import wrap
    from io import StringIO
    import sys
    import os
    import datetime
    
    
    class PyStreamCallback(StreamCallback):
        def __init__(self):
            pass
    
        def process(self, inputStream, outputStream):
    
            with wrap(inputStream) as f:
                lines = f.readlines()
                updated_lines = []
                header_line = 'fname|lname|list_index|raw_text' + '\n'
                updated_lines.append(header_line)
                for line in lines:
                    updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
                    updated_lines.append(updated_line)
    
                with wrap(outputStream, 'w') as filehandle:
                    filehandle.writelines("%s" % line for line in updated_lines)
    
    flow_file = session.get()
    
    if flow_file:
        try:
            session.write(flow_file, PyStreamCallback())
            session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
    
        except Exception as e:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
            excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
            attrMap = {'exception': str(excp)}
            flow_file = session.putAllAttributes(flow_file, attrMap)
            session.transfer(flow_file, ExecuteScript.REL_FAILURE)
    
    

    Configure PutDatabaseRecord accordingly.