Search code examples
javaspringspring-bootspring-cloud-streamevent-driven

Spring Cloud Stream - Testing consuming from and producing into a shared topic


I have been trying to implement the following solution:

spring-cloud-stream-shared-topic

My application is expecting to consume message A from all-messages, do some business logic and then producing message B back into all-messages.

The reason why I am using StreamBridge instead of a Function<A,B> is because I want the producing side to work with an arbitrary number of produced messages, but for the sake of this example I am trying to simplify the scenario to just one.

Additionally, there is a custom router function in order to avoid an infinite loop, that dispatch the incoming messages into the appropriate consumer, either incoming or discarded, would be great to have a way to effectively discard messages.

That being said I cannot quite get the implementation right using Spring Cloud Stream.

I'd like your help to understand what am I doing wrong and how to fix the current configure/setup in order to make the solution work as expected, specifically:

  • Is this solution supported by Spring cloud stream?
  • Is my application configuration correctly implementing the solution diagram above?
  • Which bindings exactly should I use inside the app when sending/receiving messages?

The major headache comes from the bindings, so I tried to write a test with different combinations of incoming and outgoing bindings to see what is what, something along these lines:

class ScsProblemTests {

    /* ... */

    @ParameterizedTest
    @MethodSource("bindings")
    void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
        givenNoOutgoingMessages();
        whenAnIncomingMessageArrives(incomingBinding);
        thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
    }

    public static Stream<Arguments> bindings() {
        return Stream.of(
            Arguments.of(null, null),
            Arguments.of(null, "outgoing-out-0"),
            Arguments.of(null, "all-messages"),
            Arguments.of("incoming-in-0", null),
            Arguments.of("incoming-in-0", "outgoing-out-0"),
            Arguments.of("incoming-in-0", "all-messages"),
            Arguments.of("all-messages", null),
            Arguments.of("all-messages", "outgoing-out-0"),
            Arguments.of("all-messages", "all-messages")
        );
    }

    /* ... */
}

I am running this set of tests with a overrides spring profile, where I setup the destination overrides as per the diagram, I am also testing the same set with a different spring profile with no overrides, again just for having a control group to compare with. Only 2 tests from the no-overrides profile pass, the rest fails.

The no-overrides profile does not match the design obviously, but I was curious to see how the overrides where affecting the results, specifically the no-overrides tests that passes are the one where:

  • incomingBinding=null, outgoingBinding=null
  • incomingBinding=null, outgoingBinding=outgoing-out-0

And for my understanding of Spring Cloud Stream, even in this no-overrides case, I'd expect also the following to pass (the are not):

  • incomingBinding=incoming-in-0, outgoingBinding=null
  • incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0

At this point I am starting to think I misunderstood some concepts behind Spring Cloud Stream, but I really hope you can provide some useful advise.

I have shared my code into this repository for convenience.

Thanks in advance.


Solution

  • Ok here are the problems in my implementation and how to fix them:

    1. I had a typo in the routing configuration, so wasn't enabled, this is how to enable the function router in spring cloud:
    spring:
      cloud:
        stream:
          function:
            routing:
              enabled: true
    
    1. Because I have a function router involved, I don't need anymore to configure my incoming-in-0 binding instead I need to configure a destination for the function router:
    spring:
      cloud:
        stream:
          bindings:
            functionRouter-in-0:
              destination: all-messages
            outgoing-out-0:
              destination: all-messages
          source: outgoing
          # ...
    
    1. I misunderstood bindings/destinations and how to use the testing helpers provided by the framework, specifically InputDestination and OutputDestination. I was not sure what parameters should I use to send a message or receive one. The answer is that those components are there to simulate the real binder (e.g. RabbitMQ, Kafka, etc), and they have no knowledge of a binding (which is a construct from spring cloud), they only know about destination. So in my case that translated into something like this:
    @SpringBootTest
    @Import(TestChannelBinderConfiguration.class)
    class ScsProblemTests {
    
        @Autowired
        private InputDestination input;
    
        @Autowired
        private OutputDestination output;
    
        /* ... */
    
        @Test
        void consumeFromAndProduceIntoSharedTopic() {
            // prepare message A ... 
    
            // simulate message "A" arriving into "all-messages"
            input.send(messageA, "all-messages")
            
            // ...
            // application will pick up the message
            // the function router will dispatch the message to the right consumer
            // the consumer does some business logic
            // eventually a message "B" should be produced into "all-messages"
            
            // check if "all-messages" contains message "B"
            // NOTE: "all-messages" will contain both "A" and "B"
            var discard = output.receive(1000, "all-messages"); // message A
            var messageB = output.receive(1000, "all-messages");
            // assertions ...
        }
    
        /* ... */
    }
    

    Note: as per the comments in the pseudo-code, the final state is represented by having both A and B in all-messages, in this case the OutputDestination is simply a window to the shared channel, which obviously will also contain the initial message we sent.

    Hopefully this makes sense. I cleaned and pushed a working version of the code into a fixed branch in the same repository so you can see the actual fixes.