See example scenario:
csv file content
database fields
fname | lname | list_index | raw_text
My objective is to ingest and save CSV file content to database using nifi processor. See sample output inserted in the database below including the record inserted in raw_text column.
fname | lname | list_index | raw_text
john | doe | 1 | "john|doe|1 "
stacy | doe | 2 | "stacy|doe|2"
If you need to do a lot of data customization then you can use ExecuteScript
processor to do data manipulation, pipeline should be something like -
ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord
Configure your ExecuteScript
as below,
Script Engine: python
Script Body:
from import IOUtils
from java.nio.charset import StandardCharsets
from import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime
class PyStreamCallback(StreamCallback):
def __init__(self):
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
updated_lines = []
header_line = 'fname|lname|list_index|raw_text' + '\n'
for line in lines:
updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in updated_lines)
flow_file = session.get()
if flow_file:
session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flow_file = session.putAllAttributes(flow_file, attrMap)
session.transfer(flow_file, ExecuteScript.REL_FAILURE)
Configure PutDatabaseRecord