I am using helidon-kafka connector to connect to OCI streams through Helidon based app. I have 2 topics and need different (de)serializers for both topics. I cannot add multiple connectors, adding 2 connectors by renaming them does not work. It generates following error
Exception in thread "main" io.helidon.microprofile.messaging.MessagingDeploymentException: No connector helidon-kaflka-2 found!
at io.helidon.microprofile.messaging.ExceptionUtils.createNoConnectorFound(ExceptionUtils.java:30)
at io.helidon.microprofile.messaging.UniversalChannel.lambda$findConnectors$0(UniversalChannel.java:170)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at io.helidon.microprofile.messaging.UniversalChannel.findConnectors(UniversalChannel.java:170)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1065)
at io.helidon.microprofile.messaging.ChannelRouter.connect(ChannelRouter.java:90)
Here is how I configuring it
outgoing.status-notifier:
connector: helidon-kaka-2
topic: ${STATUS_STREAM:-}
connector:
helidon-kafka-2:
bootstrap.servers: ${BOOTSTRAP_SERVERS}
## Uncomment line 45, 46 and comment out 47, 48 for local dev env. ##
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="<user_name>" password="<password>";
#sasl.mechanism: OCI-RSA-SHA256
#sasl.jaas.config: com.oracle.bmc.auth.sasl.InstancePrincipalsLoginModule required intent="streamPoolId:${STREAM_POOL_ID}";
security.protocol: SASL_SSL
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: <package>.StatusChangeEventSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: <package>.StatusChangeEventDeserializer
How can I have different (de)serializer for both topics?
The issue is with the connector name "helidon-kaka-2". Change it to "helidon-kaka" to match the documentation and it should just work, provided everything else is fine with the rest of the configuration.
Why does this matter you ask? Refer to the relevant connector code in Helidon git, and you will realize the connectors identify themselves by fixed name and this is defined as
@ApplicationScoped
@Connector(KafkaConnector.CONNECTOR_NAME)
public class KafkaConnector ....
/**
* Microprofile messaging Kafka connector name.
*/
static final String CONNECTOR_NAME = "helidon-kafka";