Search code examples
spring-integrationkafka-producer-apispring-kafkamapr-streams

Issue while using Multiple KafkaProducerMessageHandler with spring integration


This is the extension of the post MaprStream with spring integration Kafka Producer issue

I am facing issues with multiple KafkaProducerMessageHandlers when tried to achieve the sync property while publishing messages to maprstream

@Autowired
Qualifier("abcHandler.handler")
private KafkaProducerMessageHandler abcHandler;

@Autowired
Qualifier("xyzHandler.handler")
private KafkaProducerMessageHandler xyzHandler;

@PostConstruct
public void init() {
    this.abcHandler.setSync(true);
    this.xyzHandler.setSync(true);
}

Bean configuration:

<int:chain input-channel="inputToKafka"> 
    <int-kafka:outbound-channel-adapter 
            id="abcHandler" 
            kafka-template="template" 
            topic="${maprstream.stream.topicname}" > 
    </int-kafka:outbound-channel-adapter> 
</int:chain>

<int:chain input-channel="inputToKafka1"> 
    <int-kafka:outbound-channel-adapter 
            id="xyzHandler" 
            kafka-template="template1" 
            topic="${maprstream.stream.topicname1}" > 
    </int-kafka:outbound-channel-adapter> 
</int:chain>

I am getting the below exception while trying to load the beans.

Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'maprStreamProducerHandlerSync': Unsatisfied dependency expressed through field 'abcHandler'; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true), @org.springframework.beans.factory.annotation.Qualifier(value=abcHandler.handler)}

Can someone please help me on this?


Solution

  • Why do you have these single components within <chain/>s? Chains are usually used for grouping multiple elements.

    See the documentation about chains; scroll down to 'id' Attribute`.

    <int:chain id="chain1" input-channel="inputToKafka"> 
        <int-kafka:outbound-channel-adapter 
                id="abcHandler" 
                kafka-template="template" 
                topic="${maprstream.stream.topicname}" > 
        </int-kafka:outbound-channel-adapter> 
    </int:chain>
    

    Components within chains get composite bean names. in this case it would be chain1$child.abcHandler.handler. When auto wiring, you would need to use this value in a @Qualifier when you have more than one adapter.