Search code examples
springdynamicspring-cloudspring-cloud-streamconfluent-platform

How to Configure Spring Cloud StreamBridge to produce Avro?


So I am trying to use StreamBridge to dynamically send messages to different topics. I am successful in doing so if my output is a Message< String>, but not Message< GenericRecord>

Code example:

@StreamListener(Sink.INPUT)
public void process(@Payload GenericRecord messageValue,
                    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) GenericRecord messageKey,
                    @Header("Type") String type) {
    log.info("Processing Event --> " + messageValue);

    // Code...

    // convert to Message<GenericRecord>
    Message<GenericRecord> message = ...

    streamBridge.send(type, message);

    log.info("Processed Event --> " + messageValue);
}

The error I get is Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: which I am guessing is because streamBridge acceptedOutputTypes = application/json

2020-06-28 04:42:55.670  INFO 54347 --- [container-0-C-1] o.s.c.f.c.c.SimpleFunctionRegistry       : Looking up function 'streamBridge' with acceptedOutputTypes: [application/json]

I tried modify accepted output type to be avro by setting the following in my properties, which did not work.

spring.cloud.stream.function.definition=streamBridge
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.streamBridge-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.streamBridge-out-0.producer.use-native-encoding=true

Any ideas on how to configure StreamBridge to be avro?

edit: I also tried streamBridge.send(type, message, MimeType.valueOf("application/*+avro")) but that also had a conversion error.


Solution

  • I could not get StreamBridge to work dynamically so I switched to using Function:

    @Bean
    public Function<Message<GenericRecord>, Message<GenericRecord>> process() {
        return message -> {
    
            // Code...
    
            String topic = message.getHeaders().get("type");
    
            // convert to Message<GenericRecord>
            Message<GenericRecord> message = MessageBuilder...
                .setHeader("spring.cloud.stream.sendto.destination", topic)
                .build();
            
    
            return outgoingMessage;
        };
    }
    

    Properties file is:

    spring.cloud.function.definition=process
    spring.cloud.stream.bindings.process-in-0.destination=${consumer_topic}
    spring.cloud.stream.bindings.process-in-0.group=${spring.application.name}
    
    spring.cloud.stream.bindings.process-out-0.content-type=application/*+avro
    spring.cloud.stream.bindings.process-out-0.producer.use-native-encoding=true
    

    Edit: Streambridge got fixed to support this: https://github.com/spring-cloud/spring-cloud-stream/issues/2007