I'm currently running in a problem withe Apache NiFi ExecuteStreamCommand
using PYthon. I have a script which reads a csv and converts it in a pandas-Dataframes and afterwards in a JSON. The script splits the csv file in several DataFrames due to inconsistent naming of the columns. My current script looks as follows:
import pandas as pd
import sys
input = sys.stdin.readlines()
#searching for subfiles and saving them to a list with files ...
appendDataFrames = []
for dataFrames in range(len(files)):
df = pd.DataFrame(files[dataFrame])
#several improvements of DataFrame ...
appendDataFrames.append(df)
output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)
In the queue to my next processor I can now see one FlowFile as JSON (as expected).
My question is, is it possible to write each JSON in seperate FlowFiles, so that my next processor is able to work at them separated? I need to do this because the next processor is a InferAvroSchema
and since all JSONs have different schemas this is no opportunity. Am I mistaken? Or is there a possible way to solve this?
The code below won't work since its anyway in the same flow file and my InferAvroSchema
is not able to handle this separated.
import pandas as pd
import sys
input = sys.stdin.readlines()
#searching for subfiles and saving them to a list with files ...
appendDataFrames = []
for dataFrames in range(len(files)):
df = pd.DataFrame(files[dataFrame])
#several improvements of DataFrame ...
JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)
Thanks in advance!
I just modified my code as follows:
import pandas as pd
import sys
input = sys.stdin.readlines()
#searching for subfiles and saving them to a list with files ...
appendDataFrames = []
for dataFrames in range(len(files)):
df = pd.DataFrame(files[dataFrame])
#several improvements of DataFrame ...
JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)
sys.stdout.write("#;#")
And added a SplitContent
processor like: