Search code examples
hazelcast-jet

Hazelcast Jet multiple outbound edges


I need to populate the result of aggregation to 3 separate sinks - maps where updating logic is slightly different. I tried to convert pipeline object into a DAG and add another edge to second last vertex but it threw an exception that multiple outbound edges were not allowed. Is there any way to create DAG with multiple outbound edges?


Solution

  • You should just able to assign the stage you want to drain to multiple sinks to a variable and then repeatedly call drainTo() on it with different sinks.

    Example:

    StreamStage<TimestampedEntry<..>> stage = pipeline.drawFrom(..)
                                 .map(..)
                                 .groupingKey(..)
                                 .window(..)
                                 .aggregate(counting());
    
    stage.drainTo(Sinks.map("map1));
    stage.drainTo(Sinks.map("map2")).
    

    If you want to achieve the same using DAG API then you need to assign them to different ordinals using the Edge.from().to() construct. However if you are already starting with a pipeline this should not be necessary.