Search code examples
spring-cloud-streamspring-cloud-stream-binder

Spring Cloud Stream test-binder OutputDestination does not consume events


We use microservices and event driven architecture(more particular choreography). We use kafka and many services use Spring Cloud Stream as an abstraction over the message brokers.

After upgrading our Spring Cloud Stream related source to the new functional style, we started to have problems in our integration tests. The problem is related to the replacement of the old MessageCollector to OutputDestination(test-binder).

The problem appears in our integration tests, where we would like to verify if proper events are being produced. Many of our services produce to a topic and consume from it in another module(same service). The OutputDestination works on topic level now and not on channel as the old MessageCollector. It causes the OutputDestination to not consume any messages if there is already a listener for this topic in the prod code.

I created a simple project to present our problem https://github.com/dgyordanov/scs-functional-test

We have a simple service like:

@Service
public class OrderService {

.........

public void changeOrder() {
    // Some order changes
    streamBridge.send("orderEvents-out-0", "Test Order Change Event");
}

In another module we have a listener for these events in the production code:

@Bean
public Consumer<String> orderEvents() {
    // React on order events
    return e -> System.out.println("### Order Event: " + e);
}

I want to test the changeOrder() but nothing is being consumed:

@Test
void orderChangedTest() {
   orderService.changeOrder();
   Message<byte[]> event = outputDestination.receive(100, "edu.events.orderEvents");
   assertNotNull(event);
}

When we run the test from above, we see the result from System.out.println("### Order Event: " + e);

The problem is that if we do not exclude the orderEvents() listener from the test context, the outputDestination will never receive messages, because the orderEvents() listener will consume them first. With the old MessageCollector, which worked on channel level, it was possible.

Could you help me how to make our big cucumber integration test suite work with the spring cloud stream test-binder?

We also tried to declare another channel for the same topic, but the outputDestination still consumed nothing.


Solution

  • That is a correct behaviour, because you have a Consumer which is the end of the line since it produces nothing. In other words you can not test the outcome of the Consumer execution.

    Now, if I understand correctly, you simply want to verify that the Consumer is indeed invoked (your bindings are correct). We can certainly introduce an enhancement to the test binder to allow you to do that in a more idiomatic way (I just raised an issue for that - https://github.com/spring-cloud/spring-cloud-stream/issues/2607). As a workaround you can still do it with a little bit of a reflection. Here is a sample code:

    InputDestination inputDestination = context.getBean(InputDestination.class);
    try {
        Field chField = ReflectionUtils.findField(InputDestination.class, "channels");
        chField.setAccessible(true);
        List<SubscribableChannel> channels = (List<SubscribableChannel>) chField.get(inputDestination);
        SubscribableChannel ch = channels.iterator().next(); // or more elaborate code if there are multiple bindings to find your channel
        ch.subscribe((x) -> {
            System.out.println("Second subscriber: " + x);
        });
    } catch (Exception e) {
        // TODO: handle exception
    }
    

    FOLLOWUP

    After looking at it further and experimenting some more I realised that adding anything additional would be redundant at best. As I mentioned before, Consumer by definition returns nothing so there is nothing to evaluate. And evaluating that it was invoked with the input you just sent is redundant, since you would be re-testing the core functionality of the framework itself which is not your responsibility since there would be nothing for you to do/fix is a bug is discovered.

    And that is yet another reason why we decided to drop MessageCollector.

    So, what I would recommend is to simply do the proper unit testing of your Consumer and leave the rest (i.e., testing) to the framework.