Search code examples
javaspringspring-integrationspring-integration-dsl

Spring integration make asynchronous gateway wait for aggregation result


I'm using spring integration for processing files. I want to read file row by row, filter, convert and send to kafka. File is passed for processing via asynchronous gateway. At the end of processing I've added aggregator for 2 purposes:

  • I want to wait till whole file is processed. That is, all rows properly processed have been sent to kafka and all rows that caused errors were handled by flow that reads from error channel.
  • As a returned value for the gateway I want to receive aggregated results for file processing. How many rows were read from file, how many were processed without errors etc.

What I did so far, I have created aggregator that waits for file markers START and END + all processed rows. I have created custom message group that counts processed rows and also stores replyChannel which is taken from file START marker message and lines count which is taken from file END marker message. There is also 5s timeout set for the group. When group completes it emits message with statistics that I want and replyChannel header with the value taken from START marker. My understanding is that gateway awaits response on this replyChannel. Here is the code for message group:

public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

The problem I have is that my gateway never receives the response and waits infinitely. How can I make asynchronous gateway wait for aggregator to emit its message and receive payload of this message as a result of gateway processing?

Test recreating the problem can be found here https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4. When you run the tests application logs are available under path build/logs/spring-integration-example.log. Currently test hangs as gateway never receives response. Also the whole flow is work in progress so group releases only after timeout.


Solution

  • I've managed to fix the problem. I had to manually send aggregated message to MessageHeaders.REPLY_CHANNEL to do this I used method BaseIntegrationFlowDefinition#logAndReply. Fix, as well as finished example, is available with commit eefd5f472891ded39f363ff71ec530ada800f704.