Search code examples
spring-bootkotlinspring-cloud-streamspring-cloud-function

route FROM and route TO with spring cloud stream and functions


I have some issues with the new routing feature in spring cloud stream

I tried to implement a simple scenario, I want to send a message with a header spring.cloud.function.definition = consume1 or consume2

I expect that consume1 or consume2 should be called based on what is sent on the header but the methods are called randomly.

I send the message to the exchange consumer using the rabbit admin console

I'm having the following logs:

2020-02-27 14:48:25.896  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 7 ms
2020-02-27 14:48:26.037  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-1
2020-02-27 14:48:26.111  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://[email protected]:5672/, localPort= 62514]
2020-02-27 14:48:26.139  INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140  INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink                : Data received customer-1...body
2020-02-27 14:49:14.185  INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-2
2020-02-27 14:49:14.213  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink                : Data received customer-2...body

application.yml

spring:
  main:
    allow-bean-definition-overriding: true
spring.cloud.stream:
  function.definition: supplier;receive1;receive2;consume1;consume2
  function.routing:
    enabled: true

  bindings:
    consume1-in-0.destination: consumer
    consume1-in-0.group: app
    consume2-in-0.destination: consumer
    consume2-in-0.group: app
    receive1-in-0.destination: customer-1
    receive1-in-0.group: customer-1
    receive2-in-0.destination: customer-2
    receive2-in-0.group: customer-2

DemoApplication.java

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier


@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {

    private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()

    @RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun handleRequest(@PathVariable destName:String) {
        val message: Message<String> = MessageBuilder.withPayload("body")
                .setHeader("spring.cloud.stream.sendto.destination", destName).build()
        processor.onNext(message)
    }

    @Bean
    fun supplier(): Supplier<Flux<Message<String>>> {
        return Supplier { processor }
    }
}

const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {

    private val restTemplate: RestTemplate = RestTemplate()
    private val logger: Log = LogFactory.getLog(javaClass)

    @Bean
    fun consume1(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
    }

    @Bean
    fun consume2(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
    }
}


@Component
class TestSink {
    private val logger: Log = LogFactory.getLog(javaClass)
    @Bean
    fun receive1(): Consumer<String> = Consumer {
        logger.info("Data received customer-1..." + it);
    }

    @Bean
    fun receive2(): Consumer<String> = Consumer {
        logger.info("Data received customer-2..." + it);
    }
}

Any idea how to fix the route to consumer?

thanks in advance.

demo-repo


Solution

  • Actually I am a bit confused, so let's do one step at the time. Here is the functioning (modelled after yours) app which uses sendto feature allowing you to send messages to the specific (existing and/or dynamically resolved) destinations.

    (in java but you can rework it to Kotlin)

    @Controller
    public class WebSourceApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(WebSourceApplication.class,
                    "--spring.cloud.function.definition=supplier;consA;consB",
                    "--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
                    "--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
                    "--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
                    "--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
                    );
        }
    
        EmitterProcessor<Message<String>> processor = EmitterProcessor.create();
    
        @RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
        @ResponseStatus(HttpStatus.ACCEPTED)
        public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
            Message<String>  message = MessageBuilder.withPayload(body)
                .setHeader("spring.cloud.stream.sendto.destination", destName)
                .build();
            processor.onNext(message);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supplier() {
            return () -> processor;
        }
    
        @Bean
        public Consumer<String> consA() {
            return v -> {
                System.out.println("Consuming from consA:  " + v);
            };
        }
    
        @Bean
        public Consumer<String> consB() {
            return v -> {
                System.out.println("Consuming from consB:  " + v);
            };
        }
    }
    

    And when i curl it i get consistent invocation pr the appropriate consumer:

    curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
    log: Consuming from consA:  Hello Spring Cloud Stream
    . . .
    
    curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
    log: Consuming from consB:  Hello Spring Cloud Stream
    

    Notice: There is no enable routing property. That feature is mainly aimed to always call one function functionRouter and have it call other functions on your behalf. It is a feature of spring-cloud-function which means it works outside of spring-cloud-srteam and channels/destinations etc.

    Isn't that what you are trying to accomplish? Send message to a different destination based on some oath variable in your HTTP request?