I wrote a basic custom processor, which sends flow to "Retry" relation and also calling penalize.
package nlsn.processors.core.main;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {
public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS").description("well done, carry on").build();
public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
.name("FAILURE.").description("fail").build();
public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
.name("RETRY").description("point it back to processor").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS_RELATIONSHIP);
relationships.add(FAILURE_RELATIONSHIP);
relationships.add(POINT_TO_SELF_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile != null) {
logger.info("flow file is not null.");
String state = flowFile.getAttribute("_wait_state");
if (state == null || state.isEmpty()) {
logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
flowFile = session.putAttribute( flowFile, "_wait_state", "1");
flowFile = session.penalize(flowFile);
session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
} else {
logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
flowFile = session.removeAttribute( flowFile, "_wait_state" );
session.transfer( flowFile, SUCCESS_RELATIONSHIP);
}
} else {
//logger.info("flow file is null (bad)!!!.");
}
}
}
code is working as expected. But I am wondering why task count (192,569) is so high. As expected, process finished in 30 sec?
(see CustomWait processor task count)
Thanks
session.get()
). This session.get()
will not get any penalized FFs, so it will end up returning null. This is why the check for a null FF is needed and not bad. I'm assuming you didn't change the run schedule, which means the controller is going to attempt to run that processor as fast as possible. This leads to the inflated task count.