Search code examples
apache-nifi

Nifi Group Content by Given Attributes


I am trying to run a script or a custom processor to group data by given attributes every hour. Queue size is up to 30-40k on a single run and it might go up to 200k depending on the case.

  • MergeContent does not fit since there is no limit on min-max counts.
  • RouteOnAttribute does not fit since there are too many combinations.

Solution 1: Consume all flow files and group by attributes and create the new flow file and push the new one. Not ideal but gave it a try. While running this when I had 33k flow files on queue waiting.

session.getQueueSize().getObjectCount()

This number is returning 10k all the time even though I increased the queue threshold numbers on output flows.

Solution 2: Better approach is consume one flow file and and filter flow files matching the provided attributes

final List<FlowFile> flowFiles = session.get(file -> {
    if (correlationId.equals(Arrays.stream(keys).map(file::getAttribute).collect(Collectors.joining(":"))))
        return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
    return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
});

Again with 33k waiting in the queue I was expecting around 200 new grouped flow files but 320 is created. It looks like a similar issue above and does not scan all waiting flow files on filter query.

Problems-Question:

Is there a parameter to change so this getObjectCount can take up to 300k?

Is there a way to filter all waiting flow files again by changing a parameter or by changing the processor?

  • I tried making default queue threshold 300k on nifi.properties but it didn't help

Solution

  • in nifi.properties there is a parameter that affects batching behavior

    nifi.queue.swap.threshold=20000
    

    here is my test flow:

     1. GenerateFlowFile with "batch size = 50K"
     2. ExecuteGroovyScript with script below
     3. LogAttrribute (disabled) - just to have queue after groovy
    

    groovy script:

    def ffList = session.get(100000) // get batch with maximum 100K files from incoming queue
    if(!ffList)return
    
    def ff = session.create()        // create new empty file
    ff.batch_size = ffList.size()    // set attribute to real batch size
    
    session.remove(ffList)           // drop all incoming batch files
    REL_SUCCESS << ff                // transfer new file to success
    

    with parameters above there are 4 files generated in output:

    1. batch_size = 20000
    2. batch_size = 10000
    3. batch_size = 10000
    4. batch_size = 10000
    

    according to documentation:

    There is also the notion of "swapping" FlowFiles. This occurs when the number of FlowFiles in a connection queue exceeds the value set in the nifi.queue.swap.threshold property. The FlowFiles with the lowest priority in the connection queue are serialized and written to disk in a "swap file" in batches of 10,000.

    This explains that from 50K incoming files - 20K it keeps inmemory and others in swap batched by 10K.


    i don't know how increasing of nifi.queue.swap.threshold property will affect your system performance and memory consumption, but i set it to 100K on my local nifi 1.16.3 and it looks good with multiple small files, and first batch increased to 100K by this.