Search code examples
javaspring-bootmicroservicesevent-drivenspring-cloud-stream-binder-kafka

Routing conditional with spring cloud streams functional


I have some issues after the old imperative programming type was deprecated. I have two microservices (one as publisher and the other as subscriber) and in the old way, with the annotation @StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'") i was able to have two functions listening only that records and now i don't know how to do it.

I was reading all the documentation event routing and trying with the routing-expression but the two consumers are reading all the records.

The application yaml of the first microservies:

spring:
  cloud:
    stream:
      bindings:
        output: 
          destination: topicEvents

The seconds application yaml is:

spring:
  cloud:
    function:
      routing-expression: headers['type']
      definition: consumerPermissionEvent;consumerApiEvent
    stream:
      bindings:
        consumerPermissionEvent-in-0:
          destination: topicUsers
        consumerApiEvent-in-0:
          destination: topicUsers

I'm sending from the first microservice like that:

@Autowired
private StreamBridge bridge;

public void send(PermissionEvent event){
    Message<PermissionEvent> message = MessageBuilder.withPayload(event)
            .setHeader("type","consumerPermissionEvent").build();
    bridge.send("output", message);
}

And the second microservice has two consumers:

    @Bean
    public Consumer<Message<ApiEvent>> consumerApiEvent() {
        return e -> log.debug("READED API EVENT: {}", e.getPayload());
    }

    @Bean
    public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
        return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
    }

And the output logs from the second microservice:

[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)

Any ideas how to do it?

Thanks in advance


Solution

  • you will need to enable routing first by using following property:

    --spring.cloud.stream.function.routing.enabled=true
    

    for more details, refer to https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function