Search code examples
javamessagingamqpquarkus

Quarkus Reactive AMQP client processing in threaded manner


I am attempting to use Quarkus' AMQP (reactive-messaging-amqp) extension to decouple work from the original REST request. The idea being the REST call would kick off a long running action and could later come back to get the result.

However, it seems that in my code, Quarkus runs each step in the same thread, completing the work before returning from the original sendNewLRA() call. I would have assumed that the message would be sent through AMQP, thus decoupling the process after the message was sent. Why isn't this the case? I currently don't have any AMQP/messaging specific configuration, and just letting the default run from its own TestContainer (managed by Quarkus)

REST handler:

    @Inject
    LRAMessenger messenger;
    
    @LRA(end = false)
    @GET
    @Path("start")
    @Produces(MediaType.TEXT_PLAIN)
    public Response hello(
        @HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraId,
        @QueryParam("processTime") int processTime,
        @QueryParam("payload") String payload
    ) {
        log.info("Start. LRA ID: {}", lraId);
        
        StartMessage start = new StartMessage();
        start.setLraId(lraId);
        start.setProcessTime(processTime);
        start.setPayload(payload);
        
        this.messenger.sendNewLRA(start); // blocks here
        log.info("Sent lra processing message.");
        
        return Response.ok(lraId).build();
    }

Messaging code:

@ApplicationScoped
@Slf4j
public class LRAMessenger {
    
    @Inject
    NarayanaLRAClient lraClient;
    
    @Inject
    @Channel("lra-out")
    Emitter<StartMessage> startEmitter;
    
    /**
     * Method to kick off backend processing.
     * @param startMessage The mesage to send
     */
    @Incoming("lra-start")
    public void sendNewLRA(StartMessage startMessage) {
        startEmitter.send(startMessage);
    }
    
    @Incoming("lra-out")
    public void processLRA(StartMessage startMessage) throws InterruptedException {
        log.info("Got lra message in process step: {}", startMessage);
        lraClient.setCurrentLRA(startMessage.getLraId());
        
        int waitTime = startMessage.getProcessTime() / 10;
        
        for (int percent = 10; percent <= 100; percent += 10) {
            log.info("Waiting to simulate processing...");
            Thread.sleep(waitTime);
            log.info("Done waiting ({}%)", percent);
        }
        log.info("Waiting to simulate processing completed.");
        lraClient.closeLRA(startMessage.getLraId());
        log.info("Closed LRA.");
    }
}

Output:

2022-02-21 11:48:28,723 INFO  [org.acm.cus.dem.end.LRAResourceTest] (main) testing LRA.
2022-02-21 11:48:29,192 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Start. LRA ID: http://localhost:49251/lra-coordinator/0_ffffac110006_b651_6213c25d_2
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Got lra message in process step: StartMessage(processTime=10000, payload=null)
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:30,201 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (10%)
2022-02-21 11:48:30,202 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:31,203 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (20%)
2022-02-21 11:48:31,204 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:32,205 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (30%)
2022-02-21 11:48:32,206 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (40%)
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:34,208 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (50%)
2022-02-21 11:48:34,209 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:35,210 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (60%)
2022-02-21 11:48:35,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:36,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (70%)
2022-02-21 11:48:36,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:37,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (80%)
2022-02-21 11:48:37,213 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:38,214 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (90%)
2022-02-21 11:48:38,215 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (100%)
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing completed.
2022-02-21 11:48:39,235 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Closed LRA.
2022-02-21 11:48:39,237 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Sent lra processing message.

Note: I would expect for the Sent lra processing message. log to appear early in the process, probably before the Got lra message in process step log message.


Solution

  • Found the answer, or at least a fix...

    Adding @Blocking to the second step in the messaging chain seems to have decoupled the process:

        @Incoming("lra-out")
        @Blocking
        public void processLRA(StartMessage startMessage) throws InterruptedException {
            log.info("Got lra message in process step: {}", startMessage);
            // ...