I am trying to implement a prototype for implementing messaging system using Spring Cloud Stream. I selected Apache Kafka as binder. I created a topic with 2 partitions for scalability. Then I tried to send different messages to different partitions using following rest api method.
I set 2 different message keys for 2 partitions .
@PostMapping("/publish")
public void publish(@RequestParam String message) {
log.debug("REST request the message : {} to send to Kafka topic ", message);
Message message1 = MessageBuilder.withPayload("Hello from a")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node1")
.build();
Message message2 = MessageBuilder.withPayload("Hello from b")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node1")
.build();
Message message3 = MessageBuilder.withPayload("Hello from c")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node1")
.build();
Message message4 = MessageBuilder.withPayload("Hello from d")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node2")
.build();
Message message5 = MessageBuilder.withPayload("Hello from e")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node2")
.build();
Message message6 = MessageBuilder.withPayload("Hello from f")
.setHeader(KafkaHeaders.MESSAGE_KEY, "node2")
.build();
output.send("simulatePf-out-0", message1);
output.send("simulatePf-out-0", message2);
output.send("simulatePf-out-0", message3);
output.send("simulatePf-out-0", message4);
output.send("simulatePf-out-0", message5);
output.send("simulatePf-out-0", message6);
}
This is my application.yml for producer application
cloud:
stream:
kafka:
binder:
replicationFactor: 2
auto-create-topics: true
brokers: localhost:9092,localhost:9093,localhost:9094
auto-add-partitions: true
bindings:
simulatePf-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
bindings:
simulatePf-out-0:
producer:
useNativeEncoding: true
partition-count: 3
destination: pf-topic
content-type: text/plain
group: dsa-back-end
To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application.
cloud:
stream:
kafka:
binder:
replicationFactor: 2
auto-create-topics: true
brokers: localhost:9092, localhost:9093, localhost:9094
min-partition-count: 2
bindings:
simulatePf-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
bindings:
simulatePf-in-0:
destination: pf-topic
content-type: text/plain
group: powerflowservice
consumer:
use-native-decoding: true
. I created a function in consumer application to consume messages
@Bean
public Consumer<Message> simulatePf() {
return message -> {
log.info("header " + message.getHeaders());
log.info("received " + message.getPayload());
};
}
Now it is time for testing. To test parallelism, I run 2 instances of spring boot consumer application . I was expecting to see one consumer consumes messages from one partition, other consumer consumer messages from other partition. So I expect that message a, message b, message is consumed by consumer one. Message d, message e and message f is consumer by other consumer. Because I set different message keys to assign different partitions. But all messages are consumed by only one application
2022-06-30 20:34:48.895 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node1, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=270, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=a77d12f2-f184-0f2f-6a76-147803dd43f3, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488838, kafka_groupId=powerflowservice, timestamp=1656610488890}
2022-06-30 20:34:48.901 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from a
2022-06-30 20:34:48.929 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node1, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=271, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=2e89f9b7-b6e7-482f-3c46-f73b2ad0705c, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488840, kafka_groupId=powerflowservice, timestamp=1656610488929}
2022-06-30 20:34:48.932 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from b
2022-06-30 20:34:48.933 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node1, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=272, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=15640532-b57f-b58e-62e7-c2bc9375fdf0, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488841, kafka_groupId=powerflowservice, timestamp=1656610488933}
2022-06-30 20:34:48.934 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from c
2022-06-30 20:34:48.935 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node2, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=273, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=590f0fb7-042f-e134-d214-ead570e42fe3, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488842, kafka_groupId=powerflowservice, timestamp=1656610488934}
2022-06-30 20:34:48.938 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from d
2022-06-30 20:34:48.940 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node2, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=274, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=9a67e68b-95d4-a02e-cc14-ac30c684b639, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488842, kafka_groupId=powerflowservice, timestamp=1656610488940}
2022-06-30 20:34:48.941 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from e
2022-06-30 20:34:48.943 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : header {deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=node2, kafka_receivedTopic=pf-topic, skip-input-type-conversion=true, kafka_offset=275, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1eaf51df, source-type=streamBridge, id=333269af-bbd5-12b0-09de-8bd7959ebf08, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1656610488843, kafka_groupId=powerflowservice, timestamp=1656610488943}
2022-06-30 20:34:48.943 INFO 11860 --- [container-0-C-1] c.s.powerflow.config.AsyncConfiguration : received Hello from f
Could you help me what I am missing.
You are only setting the message key as a header when you are sending. You can add the KafkaHeaders.PARTITION
header on the message to force a specific partition.
If you don't want to add a hard-coded partition through the header, you can set a partition key SpEL expression or a partition key extractor bean in your application. Both of these mechanisms are Spring Cloud Stream specific. If you provide either of these, you still need to tell Spring Cloud Stream how you want to select the partition. For that, you can use a partition selector SpEL expression or a Partition Selector strategy. If you don't provide them, then it will use a default selector strategy by taking the hashCode
of the message key % number of topic partitions.
I think you asked another related question yesterday and I linked this blog in my answer. In the last sections of that blog, all these details are explained.
Quoting from the blog:
If you don’t provide a partition key expression or partition key extractor bean, then Spring Cloud Stream will completely stay out of the business of making any partition decision for you. In that case, if the topic has more than one partition, Kafka’s default partitioning mechanisms will be triggered. By default, Kafka uses a DefaultPartitioner, which if the message has a key (see above), then using the hash of this key for computing the partition.
I think you are seeing Kafka's default behavior in your application.