I am pushing 100 messages on to stream with 1 shrad.
spring:
cloud:
stream:
bindings:
myOutBound:
destination: my-stream
contentType: application/json
I am pushing messages in loop for testing purpose
@EnableBinding(MyBinder.class)
public class MyProcessor {
@Autowired
private MyBinder myBinder;
public void processRollup() {
List<MyObject> myObjects = IntStream.range(1, 100)
.mapToObj(Integer::valueOf)
.map(s-> new MyObject(s))
.collect(toList());
myObjects.forEach(messagePayload ->{
System.out.println(messagePayload.getId());
myBinder.myOutBound()
.send(MessageBuilder.withPayload(messagePayload)
.build());
}
);
}
}
I am consuming messages like below
spring:
cloud:
stream:
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-stream
content-type: application/json
The message consumption is not ordered.
Am I missing something.
There are several things to consider. First of all the producer in the Binder is based on the KinesisMessageHandler
with async
mode by default:
messageHandler.setSync(producerProperties.getExtension().isSync());
So, even if it looks for you that you send those messages in the right order one by one, it doesn't mean that they reach a stream on AWS at the same order.
Also there is no guarantee that they are settled on AWS in the same order anyway, even if you send them in a sync mode.
See here: Amazon Kinesis and guaranteed ordering
Also you can achieve an order guarantee within the same shard via an explicit sequenceNumber
:
To guarantee strictly increasing ordering, write serially to a shard and use the SequenceNumberForOrdering parameter.
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
Unfortunately the Kinesis Binder doesn't support that option at the moment, but we can overcome it via an explicit AwsHeaders.SEQUENCE_NUMBER
set into the message before sending it into the output
destination of the binder:
String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
}