This is the code for my custom kafka processor that simply consumes from a kafka topic and produces some data
ConsumerRecords<byte[],byte[]> records = consumer.poll(1000);
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
});
This code takes a batch of around 500 kakfa messages and produces some flowFile for output. What I need is obviously to put it inside a while loop that does the same thing over and over again. When I do that though, nothing gets out of the processor. While still, the info log shows the flowFile ids are incremented, and seems the actual flowFile is produced. One thing I tested is this happens only in infinite while loops. When I use a limited for loops the processor works fine. I am wondering there might be something about nifi flow internal that I am not aware of.
The problem is I wasn't committing the session manually. So it got committed only when the method returned which never happened in the case of an infinite while loop. The contrived solution ended up being something like this.
while(true)
ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
session.commit();
});
}