I am currently having an issue with the Apache NiFi processor ExecuteStreamCommand and the implementation of a Python-script.
I wrote the code below to transform 50 different csv-files into json. Afterwards I'am going to write those JSON to HDFS.
import json
import pandas as pd
df = pd.read_csv(r'***.csv',
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
filePath = "***" + fileName
df.to_json(filePath, orient='records', date_format='iso', date_unit='s', lines=True)
The script works fine on my local directory, but how do I need to change the input and output for NiFi?
The NiFi-flow is as follow: ListFile > FetchFile > ExecuteStreamCommand > PutHDFS.
I tried the code as follows:
#!/usr/bin/env python2
import json
import pandas as pd
df = pd.read_csv(sys.stdin,
sep='\t',
skiprows=2)
df = df.dropna(axis=1, how='all')
df = df.drop_duplicates(keep='first')
del df['Charge']
df = df.rename(columns={df.columns[0]: "Zeitstempel", df.columns[1]: "Maschine"})
df.columns = map(str.lower, df.columns)
df['zeitstempel'] = pd.to_datetime(df['zeitstempel'], format='%d.%m.%y, %X')
df['zeitstempel'] = df['zeitstempel'].astype(str)
columns = list(df.columns)
for column in range(len(columns)):
if str(columns[column]).startswith('_'):
columns[column] = columns[column][1:]
df.columns = columns
machine = df["maschine"][0]
day = str(df["zeitstempel"][0])[5:7]
month = str(df["zeitstempel"][0])[8:10]
year = str(df["zeitstempel"][0])[0:4]
fileName = machine + "_" + year + "_" + month + "_" + day + ".json"
df.to_json(sys.stdout, orient='records', date_format='iso', date_unit='s', lines=True)
And configured the processor like:
Thank you in advance from Germany!
Nicko
Configure your ExecuteStreamCommand
processor something like this way -
Also please check the official docs - ExecuteStreamCommand