Search code examples
apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream: how to send a message for each partition of a Kafka topic


I'm trying to implement a mechanism to mark the end of a sequence of kafka messages. For this reason I'm trying to send a special message (a kind of EOF) at the end of the message sequence on each topic partition. If a consumer reads all the EOF messages then it means that he has finished processing the entire sequence of messages. Is there a way to send a kafka message for each partition of a topic?

Edit: the topic name is discovered at runtime, so the number of partitions is not known but must be retrieved in some way.


Solution

  • Send a Message<?> with a PARTITION_ID header.

    MessageBuilder.withPayload(...)
        .setHeader(KafkaHeaders.PARTITION_ID, partition)
        .build();
    

    Or configure the producer with a partitionKeyExpression

    https://docs.spring.io/spring-cloud-stream/docs/3.2.2/reference/html/spring-cloud-stream.html#_producer_properties