Search code examples
pythonjsoncsvapache-nifi

Transform CSV to JSON with Apache NiFi ExecuteStreamCommand - Python


I am currently having an issue with the Apache NiFi processor ExecuteStreamCommand and the implementation of a Python-script.

I wrote the code below to transform 50 different csv-files into json. Afterwards I'am going to write those JSON to HDFS.

import json
import pandas as pd

df = pd.read_csv(r'***.csv',
                 sep='\t',
                 skiprows=2)

df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)

df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)

for column in range(len(columns)):
    if str(columns[column]).startswith('_'):
        columns[column] = columns[column][1:]

df.columns = columns

machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
filePath = "***" + fileName

df.to_json(filePath, orient='records', date_format='iso', date_unit='s', lines=True)

The script works fine on my local directory, but how do I need to change the input and output for NiFi?

The NiFi-flow is as follow: ListFile > FetchFile > ExecuteStreamCommand > PutHDFS.

I tried the code as follows:

#!/usr/bin/env python2

import json
import pandas as pd

df = pd.read_csv(sys.stdin,
                 sep='\t',
                 skiprows=2)

df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)

df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)

for column in range(len(columns)):
    if str(columns[column]).startswith('_'):
        columns[column] = columns[column][1:]

df.columns = columns

machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"

df.to_json(sys.stdout, orient='records', date_format='iso', date_unit='s', lines=True)

And configured the processor like:

ExecuteStreamCommand configuration

Thank you in advance from Germany!

Nicko


Solution

  • Configure your ExecuteStreamCommand processor something like this way -

    enter image description here

    Also please check the official docs - ExecuteStreamCommand