Search code examples
springkotlinrabbitmqspring-cloudspring-cloud-stream

Use Function to replyTo RPC request


I would like to use the java.util.Function approach to reply to an request send via RabbitTemplate.convertSendAndReceive. It's working fine with the RabbitListener but I can not get it working with the functional approach.

Client (working)

class Client(private val template RabbitTemplate) {

    fun send() = template.convertSendAndReceive(
        "rpc-exchange",
        "rpc-routing-key",
        "payload message"
    )

}

Server (approach 1, working)

class Server {

    @RabbitListener(queues = ["rpc-queue"])
    fun receiveRequest(message: String) = "Response Message"

    @Bean
    fun queue(): Queue {
        return Queue("rpc-queue")
    }

    @Bean
    fun exchange(): DirectExchange {
        return DirectExchange("rpc-exchange")
    }

    @Bean
    fun binding(exchange: DirectExchange, queue: Queue): Binding {
        return BindingBuilder.bind(queue).to(exchange).with("rpc-routing-key")
    }
    
}

Server (approach 2, not working) --> goal

class Server {

    @Bean
    fun receiveRequest(): Function<String, String> {
        return Function { value: String ->
            "Response Message"
        }
    }

}

With the config (approach 2)

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.binding.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.binding.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

With approach 2 the server receives. Unfortunately the response is lost. Does anybody know how to use the RPC pattern with the functional approach? I don't want to use the RabbitListener.

See documentation/tutorial.


Solution

  • Spring Cloud Stream is not really designed for RPC on the server side, so it won't handle this automatically like @RabbitListener does.

    You can, however, achieve it by adding an output binding to route the reply to the default exchange and the replyTo header:

    spring.cloud.function.definition: receiveRequest
    spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
    spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
    spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
    
    spring.cloud.stream.bindings.receiveRequest-out-0.destination=
    spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression=headers['amqp_replyTo']
    
    #logging.level.org.springframework.amqp=debug
    
    @SpringBootApplication
    public class So66586230Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66586230Application.class, args);
        }
    
        @Bean
        Function<String, String> receiveRequest() {
            return str -> {
                return str.toUpperCase();
            };
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                System.out.println(new String((byte[]) template.convertSendAndReceive(
                        "rpc-exchange",
                        "rpc-routing-key",
                        "payload message")));
            };
        }
    
    }
    
    PAYLOAD MESSAGE
    

    Note that the reply will come as a byte[]; you can use a custom message converter on the template to convert to String.

    EDIT

    In reply to the third comment below.

    The RabbitTemplate uses direct reply-to by default, so the reply address is not a real queue, it is a pseudo queue created by the binder and associated with a consumer in the template.

    You can also configure the template to use temporary reply queues, but they are also routed to by the default exchange "".

    You can, however, configure an external reply container, with the template as the listener.

    You can then route back using whatever exchange and routing key you want.

    Putting it all together:

    spring.cloud.function.definition: receiveRequest
    spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
    spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
    spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key
    
    spring.cloud.stream.bindings.receiveRequest-out-0.destination=reply-exchange
    spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression='reply-routing-key'
    spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.declare-exchange=false
    
    spring.rabbitmq.template.reply-timeout=10000
    
    #logging.level.org.springframework.amqp=debug
    
    
    public class So66586230Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66586230Application.class, args);
        }
    
        @Bean
        Function<String, String> receiveRequest() {
            return str -> {
                return str.toUpperCase();
            };
        }
    
        @Bean
        SimpleMessageListenerContainer replyContainer(SimpleRabbitListenerContainerFactory factory,
                RabbitTemplate template) {
    
            template.setReplyAddress("reply-queue");
            SimpleMessageListenerContainer container = factory.createListenerContainer();
            container.setQueueNames("reply-queue");
            container.setMessageListener(template);
            return container;
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template, SimpleMessageListenerContainer replyContainer) {
            return args -> {
                System.out.println(new String((byte[]) template.convertSendAndReceive(
                        "rpc-exchange",
                        "rpc-routing-key",
                        "payload message")));
            };
        }
    
    }
    

    IMPORTANT: if you have multiple instances of the client side, each needs its own reply queue.

    In that case, the routing key must be the queue name and you should revert to the previous example to set the routing key expression (to get the queue name from the header).