Search code examples
apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Kafka producer threads keep increasing


We are using Spring Cloud Stream Kafka Binder and we are facing a problem with our application that consumes one topic and process the messages then outputs them to different topics.

These topics are also consumed within the same application and output to a final topic.

We noticed a huge number of producers threads being created whenever new messages are consumed by the first consumer and these threads remain live.

Here is my simplified config :

  cloud:
    stream:
      function:
        definition: schedulingConsumer;consumerSearch1;consumerSearch2
      default:
        group: ${kafka.group}
        contentType: application/json
        consumer:
          maxAttempts: 1
          backOffMaxInterval: 30
          retryableExceptions:
            org.springframework.messaging.converter.MessageConversionException: false
      kafka:
        binder:
          brokers: ${kafka.brokers}
          headerMapperBeanName: kafkaHeaderMapper
          producerProperties:
            linger.ms: 500
            batch.size: ${kafka.batchs.size}
            compression.type: gzip
          consumerProperties:
            session.timeout.ms: ${kafka.session.timeout.ms}
            max.poll.interval.ms: ${kafka.poll.interval}
            max.poll.records: ${kafka.poll.records}
            commit.interval.ms: 500
            allow.auto.create.topics: false
      bindings:
        schedulingConsumer-in-0:
          destination: ${kafka.topics.schedules}
          consumer.concurrency: 5

        search1-out:
          destination: ${kafka.topics.groups.search1}
        search2-out:
          destination: ${kafka.topics.groups.search2}

        consumerSearch1-in-0:
          destination: ${kafka.topics.groups.search1}
        consumerSearch2-in-0:
          destination: ${kafka.topics.groups.search2}

        datasource-out:
          destination: ${kafka.topics.search.output}

Here is a screenshot from the threads activity : Thread activity

We have tried to separate the first consumer schedulingConsumer from others : consumerSearch1 and consumerSearch2 and the problem seems to be resolved.

The problem occurs when we have all these consumers running in the same instance.


Solution

  • It seems like it's a bug in spring cloud stream. I have reported it to the team Kafka producer threads keep increasing when 'spring.cloud.stream.dynamic-destination-cache-size' is exceeded #2452

    So, the solution was to override the property spring.cloud.stream.dynamic-destination-cache-size and set a value greater the number of your output bindings.

    For my case I had 14 output bindings.