I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.
I tried using python and below is my code.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
# Read the flow file content as a string
flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Split the input text into a list of IDs, trim white spaces, and convert to integers
ids = [int(id.strip()) for id in flowFileText.strip().split(',')]
if len(ids) > 0:
# Sort the IDs in ascending order
sorted_ids = sorted(ids)
# Get the first ID from the sorted list
first_id = sorted_ids[0]
# Convert the first ID back to a string
first_id_str = str(first_id)
# Write the first ID to the output stream
outputStream.write(first_id_str)
else:
# If no IDs are present, write an empty string to the output stream
outputStream.write("")
# Create an instance of the callback class
streamCallback = PyStreamCallback()
# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
try:
# Execute the callback on the flow file
session.read(flowFile, streamCallback)
session.write(flowFile, streamCallback)
# Transfer the flow file to success relationship
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
# Log the exception
log.error("Failed to process flow file: " + str(e))
session.transfer(flowFile, REL_FAILURE)
session.commit()
Output: error with the following error message
"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback "
You can use the below example
[
{
"operation": "shift",
"spec": {
"*": {
"rating": "rating"
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
//Min Value
"minRating": "=min(@(1,rating))",
// Max value
"maxMax": "=max(@(1,rating))"
}
}
]
Then you can extract the max and min values using EvaluateJsonPath such that add property max_value with value $.maxRating, and same for the min as well. Finally, you can connect it with the downstream processor like UpdateAttribute.
source: https://github.com/bazaarvoice/jolt/issues/700
I hope its helpful.