Search code examples
arraysmaxapache-nifi

Get the max value from array of integers in nifi


I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.

I tried using python and below is my code.

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        # Read the flow file content as a string
        flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

        # Split the input text into a list of IDs, trim white spaces, and convert to integers
        ids = [int(id.strip()) for id in flowFileText.strip().split(',')]

        if len(ids) > 0:
            # Sort the IDs in ascending order
            sorted_ids = sorted(ids)

            # Get the first ID from the sorted list
            first_id = sorted_ids[0]

            # Convert the first ID back to a string
            first_id_str = str(first_id)

            # Write the first ID to the output stream
            outputStream.write(first_id_str)
        else:
            # If no IDs are present, write an empty string to the output stream
            outputStream.write("")

# Create an instance of the callback class
streamCallback = PyStreamCallback()

# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
    try:
        # Execute the callback on the flow file
        session.read(flowFile, streamCallback)
        session.write(flowFile, streamCallback)

        # Transfer the flow file to success relationship
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        # Log the exception
        log.error("Failed to process flow file: " + str(e))
        session.transfer(flowFile, REL_FAILURE)
        session.commit()

Output: error with the following error message

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback "


Solution

  • You can use the below example

    [
      {
        "operation": "shift",
        "spec": {
          "*": {
            "rating": "rating"
          }
        }
      },
      {
        "operation": "modify-overwrite-beta",
        "spec": {
          //Min Value
          "minRating": "=min(@(1,rating))",
          // Max value
          "maxMax": "=max(@(1,rating))" 
        }
      }
    ]
    

    jolt demo

    enter image description here

    Then you can extract the max and min values using EvaluateJsonPath such that add property max_value with value $.maxRating, and same for the min as well. Finally, you can connect it with the downstream processor like UpdateAttribute.

    source: https://github.com/bazaarvoice/jolt/issues/700

    I hope its helpful.