Maybe this is a stupid question, but I have to ask.
I have a Collect_data processor in Nifi and it streams messages into another process that use a python script to parse that and create json file. The problem is that I don't know what is the input for function in python script. How to pass those messages (16-digit numbers) from Collect_data processor into next processor contains python script. Is there any good, basic example about this?
I was already looking for some examples online, but not really get it.
import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time
def parse_zap(inputStream, outputStream):
data = inputStream
buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
buf = int(buf, 16)
buf_check = str(buf)
if buf_check[17] == 2:
pass
datetime_now = datetime.now()
log_date = datetime_now.isoformat()
try:
mac = buf_check[7:14].upper()
ams_id = buf_check[8:]
action = buf_check[3:4]
time_a = int(time())
dict_test = {
"user": {
"guruq" : 'false'
},
"device" : {
"type" : "siolbox",
"mac": mac
},
"event" : {
"origin" : "iptv",
"timestamp": time_a,
"type": "zap",
"product-type" : "tv-channel",
"channel": {
"id" : 'channel_id',
"ams-id": ams_id
},
"content": {
"action": action
}
}
}
return dict_test
except Exception as e:
print('%s nod PARSE 500 \"%s\"' % (log_date, e))
I thank I'm reading correctly, but now I can't create output. Thanks in advance.
Take a look a this script:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text[1:]:
outputStream.write(line + "\n")
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
It takes from a property the number of lines to be removed from the flowfile, and then take the flowfile and write it again without this lines, it's easy and a good example of both, how to use the properties, and how to use the flowfile.
Based in your updated code, your code must look like this:
import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
data = inputStream
buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
buf = int(buf, 16)
buf_check = str(buf)
if buf_check[17] == 2:
pass
datetime_now = datetime.now()
log_date = datetime_now.isoformat()
try:
mac = buf_check[7:14].upper()
ams_id = buf_check[8:]
action = buf_check[3:4]
time_a = int(time())
dict_test = {
"user": {
"guruq" : 'false'
},
"device" : {
"type" : "siolbox",
"mac": mac
},
"event" : {
"origin" : "iptv",
"timestamp": time_a,
"type": "zap",
"product-type" : "tv-channel",
"channel": {
"id" : 'channel_id',
"ams-id": ams_id
},
"content": {
"action": action
}
}
}
return dict_test
except Exception as e:
print('%s nod PARSE 500 \"%s\"' % (log_date, e))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)