I am new in Apache NiFi 2.2.0 and trying to configure ExecuteGroovyScript Processor to aggregate 1000 flowfiles into one.
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.nifi.processor.io.StreamCallback
flowFileList = session.get(1000)
if (flowFileList.isEmpty()) {
return
}
def jsonSlurper = new JsonSlurper()
def data = []
flowFileList.each { flowFile ->
session.read(flowFile, { inputStream ->
def json = jsonSlurper.parse(inputStream)
codes.add(json.cis)
} as StreamCallback)}
def jsonOutput = JsonOutput.toJson([
'data': data,
'x': 'y',
'z': 't',
'v': 'f'
])
def newFlowFile = session.create()
session.write(newFlowFile, { outputStream ->
outputStream.write(jsonOutput.bytes)
} as StreamCallback)
newFlowFile.'mime.type' = 'application/json'
session.transfer(newFlowFile, REL_SUCCESS)
But faced the error:
ExecuteGroovyScript[id=d285877c-0194-1000-db2f-4b969c5efa3b] groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap.read() is applicable for argument types: (org.apache.nifi.processors.groovyx.flow.GroovySessionFile, jdk.proxy2.$Proxy225) values: [WRAP[StandardFlowFileRecord[uuid=82d909ca-35b2-4adf-a2cd-d16f3aa4056f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1738696703025-1, container=default, section=1], offset=0, length=23],offset=0,name=82d909ca-35b2-4adf-a2cd-d16f3aa4056f,size=23]], ...]
Possible solutions: read(org.apache.nifi.flowfile.FlowFile), read(org.apache.nifi.flowfile.FlowFile, org.apache.nifi.processor.io.InputStreamCallback), create(), create(), get(), get(): groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap.read() is applicable for argument types: (org.apache.nifi.processors.groovyx.flow.GroovySessionFile, jdk.proxy2.$Proxy225) values: [WRAP[StandardFlowFileRecord[uuid=82d909ca-35b2-4adf-a2cd-d16f3aa4056f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1738696703025-1, container=default, section=1], offset=0, length=23],offset=0,name=82d909ca-35b2-4adf-a2cd-d16f3aa4056f,size=23]], ...]
Possible solutions: read(org.apache.nifi.flowfile.FlowFile), read(org.apache.nifi.flowfile.FlowFile, org.apache.nifi.processor.io.InputStreamCallback), create(), create(), get(), get()
Any ideas please?
I've made some modifications to your NiFi Groovy script:
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.nifi.processor.io.InputStreamCallback
import org.apache.nifi.processor.io.OutputStreamCallback
flowFileList = session.get(1000)
if (flowFileList.isEmpty()) {
return
}
def jsonSlurper = new JsonSlurper()
def data = []
flowFileList.each { flowFile ->
session.read(flowFile, { inputStream ->
def json = jsonSlurper.parse(inputStream)
data.add(json)
} as InputStreamCallback)}
def jsonOutput = JsonOutput.toJson([
'data': data,
'x': 'y',
'z': 't',
'v': 'f'
])
def newFlowFile = session.create()
session.write(newFlowFile, { outputStream ->
outputStream.write(jsonOutput.bytes)
} as OutputStreamCallback)
flowFileList.each { flowFile -> flowFile.remove() }
newFlowFile.'mime.type' = 'application/json'
session.transfer(newFlowFile, REL_SUCCESS)
The original script uses StreamCallback
for both reading and writing, while the modified script uses InputStreamCallback
for reading and OutputStreamCallback
for writing. This change improves clarity by using more specific callback types for each operation.
The original script attempts to add json.cis
to a codes
list, which is undefined. The modified script correctly adds the entire JSON object to the data list.
You must delete all original FlowFiles if you want to create new ones or transfer them via a relationship