Search code examples
apache-kafkahelidon

Kakfa connector with helidon


I am using helidon-kafka connector to connect to OCI streams through Helidon based app. I have 2 topics and need different (de)serializers for both topics. I cannot add multiple connectors, adding 2 connectors by renaming them does not work. It generates following error

Exception in thread "main" io.helidon.microprofile.messaging.MessagingDeploymentException: No connector helidon-kaflka-2 found!
     at io.helidon.microprofile.messaging.ExceptionUtils.createNoConnectorFound(ExceptionUtils.java:30)
     at io.helidon.microprofile.messaging.UniversalChannel.lambda$findConnectors$0(UniversalChannel.java:170)
     at java.base/java.util.Optional.orElseThrow(Optional.java:403)
     at io.helidon.microprofile.messaging.UniversalChannel.findConnectors(UniversalChannel.java:170)
     at java.base/java.util.HashMap$Values.forEach(HashMap.java:1065)
     at io.helidon.microprofile.messaging.ChannelRouter.connect(ChannelRouter.java:90)

Here is how I configuring it

outgoing.status-notifier:
  connector: helidon-kaka-2
  topic: ${STATUS_STREAM:-}

connector:
  helidon-kafka-2:
    bootstrap.servers: ${BOOTSTRAP_SERVERS}
    ## Uncomment line 45, 46 and comment out 47, 48 for local dev env. ##
    sasl.mechanism: PLAIN
    sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="<user_name>" password="<password>";
    #sasl.mechanism: OCI-RSA-SHA256
    #sasl.jaas.config: com.oracle.bmc.auth.sasl.InstancePrincipalsLoginModule required intent="streamPoolId:${STREAM_POOL_ID}";
    security.protocol: SASL_SSL
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: <package>.StatusChangeEventSerializer
    key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer: <package>.StatusChangeEventDeserializer

How can I have different (de)serializer for both topics?


Solution

  • The issue is with the connector name "helidon-kaka-2". Change it to "helidon-kaka" to match the documentation and it should just work, provided everything else is fine with the rest of the configuration.

    Why does this matter you ask? Refer to the relevant connector code in Helidon git, and you will realize the connectors identify themselves by fixed name and this is defined as

    @ApplicationScoped
    @Connector(KafkaConnector.CONNECTOR_NAME)
    public class KafkaConnector ....
    
        /**
         * Microprofile messaging Kafka connector name.
         */
        static final String CONNECTOR_NAME = "helidon-kafka";