Search code examples
javaspring-integration

Spring Integration Java DSL Aggregator by Example


Spring Integration 5.x and Java (not XML) DSL here. I have the following object that will be on the message payload:

@Data // lombok is used for convenience
public class JobInfo {

    private String trackingId;
    private JobMeta metadata;
    private List<MiniJob> miniJobs;
    // lots of other fields as well (outside of scope)

}

@Data
public class MiniJob {

    private MiniJobStatus status;
    // lots of other fields as well (outside of scope)

}

@Data
public class MiniJobStatus {

    private String label; // ex: RECEIVED, IN_PROGRESS, COMPLETED, FAILED, etc.
    // lots of other fields as well (outside of scope)

}

In my integration flow, these MiniJobs are created and sent out over various channels/pathways. I need to implement an Aggregator that will wait until all of the MiniJobs for a given JobInfo are received. Once they are all received I need the aggregator to create a new JobInfo out of them and send that JobInfo along.

Every JobInfo consists of exactly 4 MiniJobs. So I'm thinking that when I create each MiniJob earlier in the flow, I could add a job-info-uuid header on their respective Messages. That way, when the Aggregator receives 4 Messages that all have matching job-info-uuid header values, it uses those 4 messages to create the JobInfo from.

Somewhat out of scope for this, but if someone could provide it in their solution I'd be eternally grateful, would be to introduce the concept of a MiniJob expiry. Meaning if the Aggregator doesn't receive all 4 job-info-uuids within, say, 24 hours, it will delete/purge them from whatever it uses to store messages that are awaiting aggregation.

I've been pouring over the Aggregator docs and looked at countless examples, but I'm just not seeing the forrest through the trees here. Do I create a Consumer<AggregatorSpec> implementation and define all this aggregation logic inside its accept(...) method, and then subsequently, add it to my flow like so:

.aggregate(myAggregatorSpecConsumer)

Or is that the wrong approach? If anyone could use my example above and help me craft that into a working example, even using pseudo-code, I'd appreciate it so much! Thanks in advance!


Solution

  • Please, read this doc: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#splitter.

    So, whatever you say about your List<MiniJob> is really a splitter feature. And since there is one-to-one relationship between Message and its payload, you don't need any extra headers like you mention that job-info-uuid. The splitter will populate CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER headers into splitted item when you return your List<MiniJob> from split(Function).

    Then when you move on into an Aggregator section you'll see a mirroring functionality where, by default` the aggregator really consults with those mentioned headers from splitter and collects groups respectively.

    What you will need in the end is just a custom:

    .aggregate(a -> a
                .outputProcessor((group) -> group
    

    To be build a new JobInfo from a released group.

    See a groupTimeout() option for those groups which cannot be gathered in time.

    There is also a MessageGroupStoreReaper to be scheduled externally since you talk about 24 hours: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#reaper

    So, in the end what you need is just something like this:

                    .split(JobInfo.class, JobInfo::getMiniJobs)
                    .channel(c -> c.executor(...))
                    .handle(...)
                    .aggregate(a -> a
                            .outputProcessor((group) -> {
                                List<MiniJob> miniJobs =
                                        group
                                                .getMessages()
                                                .stream()
                                                .map(m -> (MiniJob) m.getPayload())
                                                .toList();
                                ...
                                return new JobInfo();
                            }))