I am using spring cloud stream and trying to publish an avro message but getting the above exception,
I have below properties,
spring.cloud.stream.bindings.feed-output-channel.producer.useNativeEncoding=true
spring.cloud.stream.bindings.feed-output-channel.destination=TOPI.NAME
spring.cloud.stream.bindings.feed-output-channel.producer.partition-count=1
spring.cloud.stream.bindings.feed-output-channel.contentType=application/*+avro
spring.cloud.stream.bindings.feed-output-channel.producer.partition-key-expression=headers['kafka_messageKey']
spring.cloud.stream.kafka.bindings.feed-output-channel.producer.configuration.request.timeout.ms=60000
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url=https://base-url:8081/
spring.cloud.stream.kafka.binder.brokers=g-vmx.com:9092,g-vmx.com:9092,g-vmx.com:9092
I have added the confluentSchema registry beam also, have @EnableSchemaRegistryClient
annotation n main class
@Bean
@Primary
public SchemaRegistryClient schemaRegistryClient() {
log.info("schema registry bean");
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}
Code where I am pushing the message
public void publishFeed(String market, String sku) {
token.link();
MessageChannel messageChannel = resolveMessageChannel();
String messageKey = String.format("%s_%s", market, sku);
Payload Payload = buildPayload(market, sku);
Message<RequestPayload> message = MessageBuilder
.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, messageKey)
.build();
log.info("publish requestPayload message {} ", message);
boolean sent = messageChannel.send(message, DEFAULT_TIMEOUT_TO_SEND_MESSAGE);
if (!sent) {
throw new MessagePublishException("Unable to send=" + message);
}
}
Dependency
implementation 'org.springframework.cloud:spring-cloud-stream-schema:2.2.1.RELEASE'
implementation "io.confluent:kafka-avro-serializer:5.3.0"
implementation "org.apache.avro:avro:1.10.1"
At this messageChannel.send(message, DEFAULT_TIMEOUT_TO_SEND_MESSAGE);
line it throw exception.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@1dc8da99]; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
Not able to figure out why it's throwing this exception.
I was able to publish the message in AVRO format successfully, Though, the above configuration and properties are enough for publishing a message in AVRO format. It's working in other application but not in my app.
So I added below properties. After that it was successful
spring.cloud.stream.kafka.binder.configuration.basic.auth.credentials.source=USER_INFO
spring.cloud.stream.kafka.binder.configuration.basic.auth.user.info=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.ssl.truststore.password=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.ssl.truststore.location=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.ssl.keystore.location=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.ssl.keystore.password=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.ssl.key.password=
spring.cloud.stream.kafka.binder.producer-properties.basic.auth.credentials.source=USER_INFO
spring.cloud.stream.kafka.binder.producer-properties.basic.auth.user.info=
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.auto.register.schemas=false
Though I already had the certificate information defined under below properties but it didn't work with these properties , had to add the above properties
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
with other properties