I'm trying to implement the proposed SCS aggregates, but I'm not sure to understand the real purpose of them, as the results I get surprise me. First, Here is the code...
The source, a messages provider to be scheduled :
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);
@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<String> createMessage() {
return () -> {
String payload = now().toString();
logger.warn("Sent: " + payload);
return new GenericMessage<>(payload);
};
}
}
Then a simple processor (a transformer) :
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
return payload + " is the time.";
}
}
And this is the final consumer (the sink) :
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {
private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void loggerSink(Object payload) {
logger.warn("Received: " + payload);
}
}
And Finally, the aggregate is linking those three :
@SpringBootApplication
public class SampleAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder().web(false)
.from(SourceApplication.class)
.args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
.via(ProcessorApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step1",
"--spring.cloud.stream.bindings.output.destination=step2")
.to(SinkApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step2")
.run(args);
}
}
When The aggregate is launched, these are an excerpt of the retrieved traces :
2017-02-03 09:59:13.428 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:20.018
2017
I would have expected that EVERY message would have followed the defined cycle : source-processor-sink. Be we can see that at least 2 out of 3 messages are lost, and that only 1 out of 4 messages are transformed. NB: the channel destinations were added in a second attempt, in order to avoid a supposed mix-up between the applications (using same RabbitMQ middleware).
Can someone tell me if I correctly undestood aggregates purpose and did the right things to implement it ? Thanks in advance.
A couple of things:
@SpringBootApplication
on each aggregate component definition. What happens in your case is that you get multiple consumers on the various channels instead of the chaining that you'd expect. Moving Source, Transformer and Sink to separate packages should work for you. Also, added https://github.com/spring-cloud/spring-cloud-stream/issues/785 to track this.