I have a Spring Cloud Stream (Elmhurst Release) Splitter with a RabbitMQ binder. I'm trying to update it so that it will use Avro schema for the payloads. I'm getting a conversion exception which makes it look like the Avro converter isn't getting invoked, and the JSON Converter picks up the message and trips on it.
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
I've confirmed that I can create objects and serialize them to-disk using the generated Avro class (maven-avro plugin), so that part seems right. I've converted the project to Java from Groovy and still get the same error, so I think that's ruled out, too.
Here's the Avro Schema:
{
"namespace": "com.example.avro",
"type": "record",
"name": "SkinnyMessage",
"doc": "Light message for passing references to Message objects",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "guid",
"type": "string"
}
]
}
And the relevant part of the class:
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
@Timed(value = 'paging.query')
@Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
List<SkinnyMessage> queryExecutor(def trigger){
log.debug 'Building query'
def query = queryConfiguration.buildQuery().toUriString()
log.info "Executing query: ${query}"
def response = service.getRecordings(query)
log.info "Returning response collection: ${response.body.content.size()}"
// We build a slim notification on each of the query responses
def skinnyMessages = response.body.content.collect{
new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
}
skinnyMessages
}
...
}
Edit: When I step through with a debugger, I can see that the AvroSchemaRegistryClientMessageConverter fails the canConvertTo(payload, headers)
call because the mimeType
in the headers is application/json
and not application/*+avro
, so it continues trying the rest of the converter chain.
If I build a message and set an avro content-type header on it, that seems to work, but that seems like a hack.
List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
MessageBuilder.withPayload(
new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
.setHeader('contentType', 'application/*+avro')
.build()
}
Creates messages which look right in RabbitMQ UI:
contentType: application/vnd.skinnymessage.v1+avro correlationId: f8be74d6-f780-efcc-295d-338a8b7f2ea0 content_type: application/octet-stream Payload 96 bytes Encoding: string
thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91
If I understand the docs correctly though, this should happen transparently from the setting in the application.yml: (as in Schema Registry Samples):
spring:
cloud:
stream:
bindings:
output:
contentType: application/*+avro