Search code examples
springspring-cloudspring-cloud-streamspring-cloud-bus

Spring Cloud Stream - Aggregates


I'm trying to implement the proposed SCS aggregates, but I'm not sure to understand the real purpose of them, as the results I get surprise me. First, Here is the code...

The source, a messages provider to be scheduled :

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

    private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);

    @Bean
    @InboundChannelAdapter(Source.OUTPUT)
    public MessageSource<String> createMessage() {
        return () -> {
            String payload = now().toString();
            logger.warn("Sent: " + payload);
            return new GenericMessage<>(payload);
        };
    }
}

Then a simple processor (a transformer) :

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

    @Transformer(inputChannel = Processor.INPUT,
                 outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        return payload + " is the time.";
    }
}

And this is the final consumer (the sink) :

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

    private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void loggerSink(Object payload) {
        logger.warn("Received: " + payload);
    }
}

And Finally, the aggregate is linking those three :

@SpringBootApplication
public class SampleAggregateApplication {

    public static void main(String[] args) {
        new AggregateApplicationBuilder().web(false)
            .from(SourceApplication.class)
                .args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
            .via(ProcessorApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step1",
                      "--spring.cloud.stream.bindings.output.destination=step2")
            .to(SinkApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step2")
        .run(args);
    }
}

When The aggregate is launched, these are an excerpt of the retrieved traces :

2017-02-03 09:59:13.428  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:20.018
2017

I would have expected that EVERY message would have followed the defined cycle : source-processor-sink. Be we can see that at least 2 out of 3 messages are lost, and that only 1 out of 4 messages are transformed. NB: the channel destinations were added in a second attempt, in order to avoid a supposed mix-up between the applications (using same RabbitMQ middleware).

Can someone tell me if I correctly undestood aggregates purpose and did the right things to implement it ? Thanks in advance.


Solution

  • A couple of things:

    • you shouldn't specify destinations for your applications, as the applications that are part of an aggregate communicate via internal channels, rather than going over the broker;
    • secondly (and this is something that our documentation doesn't specify, unfortunately) - the different parts of an aggregate must belong to different Java packages when using @SpringBootApplication on each aggregate component definition. What happens in your case is that you get multiple consumers on the various channels instead of the chaining that you'd expect. Moving Source, Transformer and Sink to separate packages should work for you. Also, added https://github.com/spring-cloud/spring-cloud-stream/issues/785 to track this.