Search code examples
hazelcast-jet

Processor with 2 incoming edges - When returning false on one edge, keep re-processing from same edge and never process new items on other edge


I'm asking for a confirmation to my assumption about the tryProcess() logic.

In detail how the return value (true/false) influence the DAG workflow on a processor with 2 incoming edges with no priority specified.

My assumption is that if the processor have incoming items for both edges, and one tryProcess() return false, the other edge will be processed (if more incoming items are available on such edge). Alternating incoming items depending on which edge stop processing and which accept them.

Problem

Sometimes happens that one processor instance blocks on tryProcess(#0) that always returns false (because we expect to process a new item from other edge). tryProcess(#0) is called repetedly and tryProcess(#1) is never called. I'm sure that the completeEdge() is never called on the processor neither for #0 nor #1 edge, so I expect there are more items to process from edge #1. This happens usually after running the same Job multiple times.

To better explain the question, this is my use case:

Use Case

My datamodel is composed by following object

  • A: Object identified by "ida" attribute
  • B: Object identified by "idb" attribute. It has a reference to A using "ida" value
  • AB: Object that couples B object and its referenced A object

I need to match B objects with proper referenced A object and emit a couple of them.

I have a DAG with this setup:

Vertices

  • S-A: Source items of type "A" (localParallelism(1), emits A objects sorted by "ida" attribute)
  • S-B: Source items of type "B" (localParallelism(1), emits B objects sorted by referenced "ida" attribute)
  • C-AB: Processor that matches B objects with referenced A object (emits AB objects)

Connections

  • S-A -> C-AB : incoming edge #0 (no priority specified, partitioned by "ida" attribute)
  • S-B -> C-AB : incoming edge #1 (no priority specified, partitioned by reference to "ida" attribute)

The environment is composed by an hazelcast jet cluster with 2 nodes.

Logics

The C-AB processor get the first "A" object (from edge #0) and keep it until all "B" objects related to that "A" object are processed. If it receives another "A" object it return false in tryProcess(#0).

While it receives "B" objects (from edge #1) which match to current "A", it emits "AB".

When the processor receives a "B" object with a reference with a next "A" object, it discards current "A" and waits for the next one.

If it receives "B" objects before having the referenced "A" object, wait for the proper "A" to match returning false in tryProcess(#1) if a new "B" is received.

This should work because S-A and S-B emits properly sorted objects and edges are properly partitioned to send objects with same "ida" values to the same processor.


Solution

  • My assumption is that if the processor have incoming items for both edges, and one tryProcess() return false, the other edge will be processed (if more incoming items are available on such edge).

    This assumption is false. The processor's behavior is equivalent to

    for (Object item : inbox) process(item);
    

    but implemented with cooperative multithreading, which is why this loop must be able to "suspend" itself. We achieve suspension by letting tryProcess() return false.

    The execution engine always resumes the processor where it left off and won't try to process any other items until the inbox is clear. The inbox itself contains a batch of items taken from an input queue, not all the items the edge will transfer during the entire job.

    The only mechanism Jet offers to solve co-dependence between edges is edge priority. If you need something more fine-grained than that, your processor should accept all the items and buffer them internally until your progress condition is met.