I am trying to run a script or a custom processor to group data by given attributes every hour. Queue size is up to 30-40k on a single run and it might go up to 200k depending on the case.
Solution 1: Consume all flow files and group by attributes and create the new flow file and push the new one. Not ideal but gave it a try. While running this when I had 33k flow files on queue waiting.
session.getQueueSize().getObjectCount()
This number is returning 10k all the time even though I increased the queue threshold numbers on output flows.
Solution 2: Better approach is consume one flow file and and filter flow files matching the provided attributes
final List<FlowFile> flowFiles = session.get(file -> {
if (correlationId.equals(Arrays.stream(keys).map(file::getAttribute).collect(Collectors.joining(":"))))
return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
});
Again with 33k waiting in the queue I was expecting around 200 new grouped flow files but 320 is created. It looks like a similar issue above and does not scan all waiting flow files on filter query.
Problems-Question:
Is there a parameter to change so this getObjectCount can take up to 300k?
Is there a way to filter all waiting flow files again by changing a parameter or by changing the processor?
in nifi.properties
there is a parameter that affects batching behavior
nifi.queue.swap.threshold=20000
here is my test flow:
1. GenerateFlowFile with "batch size = 50K"
2. ExecuteGroovyScript with script below
3. LogAttrribute (disabled) - just to have queue after groovy
groovy script:
def ffList = session.get(100000) // get batch with maximum 100K files from incoming queue
if(!ffList)return
def ff = session.create() // create new empty file
ff.batch_size = ffList.size() // set attribute to real batch size
session.remove(ffList) // drop all incoming batch files
REL_SUCCESS << ff // transfer new file to success
with parameters above there are 4 files generated in output:
1. batch_size = 20000
2. batch_size = 10000
3. batch_size = 10000
4. batch_size = 10000
according to documentation:
There is also the notion of "swapping" FlowFiles. This occurs when the number of FlowFiles in a connection queue exceeds the value set in the
nifi.queue.swap.threshold
property. The FlowFiles with the lowest priority in the connection queue are serialized and written to disk in a "swap file" in batches of 10,000.
This explains that from 50K incoming files - 20K it keeps inmemory and others in swap batched by 10K.
i don't know how increasing of nifi.queue.swap.threshold
property will affect your system performance and memory consumption, but i set it to 100K on my local nifi 1.16.3 and it looks good with multiple small files, and first batch increased to 100K by this.