Kafka Reply Time out when applications are auto scaled in PCF

I am using ReplyingKafkaTemplate for Kafka Synchronous responses and I am able to get response when only once instance is running. But if the application scaled up to more than one instance I am getting Time out error.

From Documentation

When configuring with a single reply topic, each instance must use a different In this case, all instances receive each reply.

As per Documentation,if we need to use different consumer group, does this mean we need to manually run the instances with different consumer group? How can we handle auto scaling if we use tools like PCF. Below is my kafka configuraion.

public class KafkaConfig {

    //My Properties

    public Map < String, Object > producerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;

    public Map < String, Object > consumerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        return props;

    public ProducerFactory < String, String > producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfig());

    public ConsumerFactory < String, String > consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfig());

    public KafkaTemplate < String, String > kafkaTemplate() {
        return new KafkaTemplate(producerFactory());

    public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
                            (ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
        ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
        return rkt;

    public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);

        return new KafkaMessageListenerContainer < > (cf, containerProperties);

    public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
        factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
        return factory;


  • In the replyContainer bean, add

    containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies

    In the replyingKafkaTemplate, add


    The request topic needs at least as many partitions as the maximum scale-out. The reply topic can have any number of partitions (including 1).

    With PCF, you can construct the groupId using the instanceIndex instead of making it random.

    You could also use the instanceIndex as the REPLY_PARTITION header and use fixed reply partitions; in which case you would need at least as many partitions as the maximum instanceIndex you expect to use.