Can someone who is experienced with spring cloud stream with kafka binder help with clarifying this question?
I have a simple application which consumes from different topics. This is my config and looks correct based on the spring doc.
spring:
cloud:
function:
definition: app
stream:
bindings:
app-in-0:
destination: topic-a,topic-b
group: mygroup
kafka:
binder:
consumer-properties:
group:
instance:
id: myid
But I get this error.
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.
Caused by: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.
It works fine for single topic. Fails only for more than 1 topic. I could guess it happens because spring seems to create 2 different "consumer" for each topic. But not sure how I can fix this in this case?
After Gary's answer, I tried this.
spring:
cloud:
function:
definition: app
stream:
bindings:
app-in-0:
destination: topic-a,topic-b
group: mygroup
consumer:
multiplex: true
kafka:
binder:
consumer-properties:
group:
instance:
id: myid
I get this error.
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-a,topic-b]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-a,topic-b]
DEBUG mode shows this.
Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=consumer-my-group-1, correlationId=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='topic-a,topic-b')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=consumer-my-group-1, correlationId=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='localhost', port=9092, rack=null)], clusterId='OTMwNzFhYTY1ODNiNGE5OQ', controllerId=1, topics=[MetadataResponseTopic(errorCode=17, name='topic-a,topic-b', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648)
Set spring.cloud.stream.bindings.app-in-0.consumer.multiplex:true
.
/**
* When set to true, the underlying binder will natively multiplex destinations on the
* same input binding. For example, in the case of a comma separated multiple
* destinations, the core framework will skip binding them individually if this is set
* to true, but delegate that responsibility to the binder.
*
* By default this property is set to `false` and the binder will individually bind
* each destinations in case of a comma separated multi destination list. The
* individual binder implementations that need to support multiple input bindings
* natively (multiplex) can enable this property.
*/
private boolean multiplex;
EDIT
You have the multiplex
in the wrong place; it is a binding consumer property, not a top level spring.cloud.stream
property.
This works fine for me.
@SpringBootApplication
public class So75912831Application {
public static void main(String[] args) {
SpringApplication.run(So75912831Application.class, args);
}
@Bean
Consumer<String> input() {
return System.out::println;
}
}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=topic1,topic2
spring.cloud.stream.bindings.input-in-0.consumer.multiplex=true
...o.s.c.s.b.k.KafkaMessageChannelBinder$2 \
foo: partitions assigned: [topic1-0, topic2-0]