Search code examples
quarkusjtamicroprofilesmallrye-reactive-messagingmutiny

How to propagate JTA state when using reactive-messaging?


I would like to propagate JTA state (= the transaction) between a transactional REST endpoint that emits a message to a reactive-messaging connector.

@Inject
@Channel("test")
Emitter<String> emitter;

@POST
@Transactional
public Response test() {
    emitter.send("test");
}

and

@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {

    @Inject
    TransactionManager tm;

    @Override
    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        return ReactiveStreams.<Message<?>>builder()
            .flatMapCompletionStage(message -> {
                tm.getTransaction(); // = null
                return message.ack();
            })
            .ignore();
    }
}

As I understand, context-propagation is responsible for making the transaction available (see io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext). The problem seems to be, that currentContext gets created on subscription, which happens when the injection point (Emitter<String> emitter) get its instance. Which is too early to properly capture the transaction.

What am I missing?

By the way, I am having the same problem when using @Incoming / @Outgoing instead of the emitter. I have decided to give you this example because it is easy to understand and reproduce.


Solution

  • At the moment, you need to pass the current Transaction in the message metadata. Thus, it will be propagated to your different downstream components (as well as the connector).

    Note that, Transaction tends to be attached to the request scope, which means that in your connector, it may already be too late to use it. So, make sure your endpoint is asynchronous and only returns when the emitted message is acknowledged.

    Context Propagation is not going to help in this case as the underlying streams are built at startup time (at build time in Quarkus) so, there are no capture contexts.