Search code examples
pythonjsonapache-nifi

Apache NiFi: ExecuteStreamCommand generating two flow files


I'm currently running in a problem withe Apache NiFi ExecuteStreamCommand using PYthon. I have a script which reads a csv and converts it in a pandas-Dataframes and afterwards in a JSON. The script splits the csv file in several DataFrames due to inconsistent naming of the columns. My current script looks as follows:

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   appendDataFrames.append(df)

output = pd.concat(appendDataFrames)
JSONOutPut = output.to_json(orient='records', date_format='iso', date_unit='s')
sys.stdout.write(JSONOutPut)

In the queue to my next processor I can now see one FlowFile as JSON (as expected). My question is, is it possible to write each JSON in seperate FlowFiles, so that my next processor is able to work at them separated? I need to do this because the next processor is a InferAvroSchema and since all JSONs have different schemas this is no opportunity. Am I mistaken? Or is there a possible way to solve this?

The code below won't work since its anyway in the same flow file and my InferAvroSchema is not able to handle this separated.

import pandas as pd
import sys

input = sys.stdin.readlines()

#searching for subfiles and saving them to a list with files ...

appendDataFrames = []

for dataFrames in range(len(files)):
   df = pd.DataFrame(files[dataFrame])
   #several improvements of DataFrame ...
   JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
   sys.stdout.write(JSONOutPut)

Thanks in advance!


Solution

  • I just modified my code as follows:

    import pandas as pd
    import sys
    
    input = sys.stdin.readlines()
    
    #searching for subfiles and saving them to a list with files ...
    
    appendDataFrames = []
    
    for dataFrames in range(len(files)):
       df = pd.DataFrame(files[dataFrame])
       #several improvements of DataFrame ...
       JSONOutPut = df.to_json(orient='records', date_format='iso', date_unit='s')
       sys.stdout.write(JSONOutPut)
       sys.stdout.write("#;#")
    
    

    And added a SplitContent processor like:

    Properties of SplitContent-processor