Search code examples
spring-kafkaspring-cloud-stream

Spring Cloud Stream FencedInstanceIdException


Can someone who is experienced with spring cloud stream with kafka binder help with clarifying this question?

I have a simple application which consumes from different topics. This is my config and looks correct based on the spring doc.

spring:
  cloud:
    function:
      definition: app
    stream:
      bindings:
        app-in-0:
          destination: topic-a,topic-b
          group: mygroup
      kafka:
        binder:
          consumer-properties:
            group:
              instance:
                id: myid

But I get this error.

 org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.
Caused by: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.

It works fine for single topic. Fails only for more than 1 topic. I could guess it happens because spring seems to create 2 different "consumer" for each topic. But not sure how I can fix this in this case?


After Gary's answer, I tried this.

spring:
  cloud:
    function:
      definition: app
    stream:
      bindings:
        app-in-0:
          destination: topic-a,topic-b
          group: mygroup
          consumer:
            multiplex: true
      kafka:
        binder:
          consumer-properties:
            group:
              instance:
                id: myid

I get this error.

 org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-a,topic-b]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-a,topic-b]

DEBUG mode shows this.

Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=consumer-my-group-1, correlationId=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='topic-a,topic-b')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)

Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=consumer-my-group-1, correlationId=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='localhost', port=9092, rack=null)], clusterId='OTMwNzFhYTY1ODNiNGE5OQ', controllerId=1, topics=[MetadataResponseTopic(errorCode=17, name='topic-a,topic-b', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648)

Solution

  • Set spring.cloud.stream.bindings.app-in-0.consumer.multiplex:true.

    /**
     * When set to true, the underlying binder will natively multiplex destinations on the
     * same input binding. For example, in the case of a comma separated multiple
     * destinations, the core framework will skip binding them individually if this is set
     * to true, but delegate that responsibility to the binder.
     *
     * By default this property is set to `false` and the binder will individually bind
     * each destinations in case of a comma separated multi destination list. The
     * individual binder implementations that need to support multiple input bindings
     * natively (multiplex) can enable this property.
     */
    private boolean multiplex;
    

    EDIT

    You have the multiplex in the wrong place; it is a binding consumer property, not a top level spring.cloud.stream property.

    This works fine for me.

    @SpringBootApplication
    public class So75912831Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So75912831Application.class, args);
        }
    
        @Bean
        Consumer<String> input() {
            return System.out::println;
        }
    
    }
    
    spring.cloud.stream.bindings.input-in-0.group=foo
    spring.cloud.stream.bindings.input-in-0.destination=topic1,topic2
    spring.cloud.stream.bindings.input-in-0.consumer.multiplex=true
    
    ...o.s.c.s.b.k.KafkaMessageChannelBinder$2 \
      foo: partitions assigned: [topic1-0, topic2-0]