I'm using the org.springframework.cloud:spring-cloud-stream-binder-kafka
library and I'm having trouble with partitioning messages in a topic. My topic has 4 partitions but i'm only seeing events in partition 0 i.e. the publisher is not partitioning the event correctly.
When i check the topic partitions (the one that has messages in it), I do see that the message has a proper value for the key field (but it's not being used? idk, i'm a little confused)
I followed the official partitioning example and have the following code:
class FooEventPublisher {
private val logger = LoggerFactory.getLogger(this::class.java)
private val mapper = jacksonObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
private val ingressChannel = Channel<FooEvent>(capacity = Channel.UNLIMITED)
/** other component will call this to pipe in events to be published */
suspend fun send(event: FooEvent) = ingressChannel.send(event)
/** helper function to convert [FooEvent] into a [Message] with a JSON payload */
private fun FooEvent.toMessage(): Message<ByteArray> {
val payload = mapper.writeValueAsBytes(this)
val partitionKey = this.name
val message = MessageBuilder
.setHeader(KafkaHeaders.MESSAGE_KEY, partitionKey.toByteArray())
.setHeader("partitionKey", partitionKey.toByteArray())
return message
fun publishFooEvents(): () -> Flux<Message<ByteArray>> = {
.map {
try {
} catch (err: Exception) {
logger.error("Skipping event because of encoding failure", err)
logger.trace("problematic event=$it")
definition: publishFooEvents
brokers: localhost:9092
destination: kf-foo-events-topic
partition-key-expression: headers['partitionKey']
I expected the kafka binder library to use the partitionKey
field as the field to partition on e.g. all messages with key 1234
would go to partition 1 and messages with key 5678
would go to partition 2
I'm not sure what i'm missing here? why isn't the binder detecting that the target topic has 4 partitions and using that information to partition?
edit: fixed key in example above
Partitioning at the binder level is not intended for infrastructure that supports partitioning natively, such as Kafka. Just use native Kafka partitioning instead (which by default will be based on the key).
Furthermore, you are setting the header to a byte[]
; it should remain as String
so that the hash algorithm uses the value; the hash code of byte[]
depends on its system identity, not the array contents.
e.g. all messages with key 1234 would go to partition 1 and messages with key 1234 would go to partition 2
That makes no sense, I presume you meant to specify different keys.