Search code examples
groovyrabbitmqavrospring-cloud-stream

MessageConversionException when trying to serialize a List of Avro SpecificRecords and split them with @Splitter


I have a Spring Cloud Stream (Elmhurst Release) Splitter with a RabbitMQ binder. I'm trying to update it so that it will use Avro schema for the payloads. I'm getting a conversion exception which makes it look like the Avro converter isn't getting invoked, and the JSON Converter picks up the message and trips on it.

Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"SkinnyMessage","namespace":"com.example.avro","doc":"Light message for passing references to Message objects","fields":[{"name":"id","type":"string"},{"name":"guid","type":"string"}]} (through reference chain: com.example.avro.SkinnyMessage["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])

I've confirmed that I can create objects and serialize them to-disk using the generated Avro class (maven-avro plugin), so that part seems right. I've converted the project to Java from Groovy and still get the same error, so I think that's ruled out, too.

Here's the Avro Schema:

{
  "namespace": "com.example.avro",
  "type": "record",
  "name": "SkinnyMessage",
  "doc": "Light message for passing references to Message objects",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "guid",
      "type": "string"
    }
  ]
}

And the relevant part of the class:

@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
class PagingQueryProcessorApplication {
    @Timed(value = 'paging.query')
    @Splitter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    List<SkinnyMessage> queryExecutor(def trigger){
        log.debug 'Building query'
        def query = queryConfiguration.buildQuery().toUriString()

        log.info "Executing query: ${query}"
        def response = service.getRecordings(query)

        log.info "Returning response collection: ${response.body.content.size()}"
        // We build a slim notification on each of the query responses
        def skinnyMessages = response.body.content.collect{
            new SkinnyMessage(it.getLink('self').getHref(), it.content.guid)
        }
        skinnyMessages
    }
...
}

Edit: When I step through with a debugger, I can see that the AvroSchemaRegistryClientMessageConverter fails the canConvertTo(payload, headers) call because the mimeType in the headers is application/json and not application/*+avro, so it continues trying the rest of the converter chain.


Solution

  • If I build a message and set an avro content-type header on it, that seems to work, but that seems like a hack.

    List<Message<SkinnyMessage>> skinnyMessages = response.body.content.collect{
        MessageBuilder.withPayload(
                new SkinnyMessage(it.getLink('self').getHref(), it.content.recordingGuid))
                .setHeader('contentType', 'application/*+avro')
                .build()
    }
    

    Creates messages which look right in RabbitMQ UI:

    contentType: application/vnd.skinnymessage.v1+avro correlationId: f8be74d6-f780-efcc-295d-338a8b7f2ea0 content_type: application/octet-stream Payload 96 bytes Encoding: string

    thttps://example.com/message/2597061H9a688e40-3e30-4b17-80e9-cf4f897e8a91

    If I understand the docs correctly though, this should happen transparently from the setting in the application.yml: (as in Schema Registry Samples):

    spring: cloud: stream: bindings: output: contentType: application/*+avro