Search code examples
spring-bootavrospring-cloud-streamspring-cloud-dataflow

Activating Avro message converter in Spring Cloud Dataflow


I am implementing a stream app on Spring Cloud Dataflow. I would like to use Avro based schema registry client for serialization and schema control. My basic goal is to feed Source app with some external data, transform it to prepared avro-based schema and send it to a Sink app which will accept only this schema. I would like to use the schema from external schema registry server, not the file version of the schema. My code looks like:

@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class DisSampleSource {

    private final DisSampleSourceProperties properties;

    @Inject
    public DisSampleSource(DisSampleSourceProperties properties) {
        this.properties = properties;
    }

    @InboundChannelAdapter(Source.OUTPUT)
    public String feed() throws IOException {
        if (!Paths.get(properties.getPath()).toFile().exists()) {
            throw new InvalidPathException(this.properties.getPath(),
                    "The file does not exists or is of not proper type.");
        }
        return new String(Files.readAllBytes(Paths.get(properties.getPath())), StandardCharsets.UTF_8);
    }
}

POM:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-schema</artifactId>
        <version>1.1.1.RELEASE</version>
    </dependency>

During starting application I am passing property:

 --spring.cloud.stream.bindings.output.contentType=application/foo.bar.v1+avro

At the moment, the application fails to start with following exception:

2017-02-13 16:25:30.430  WARN 2444 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
2017-02-13 16:25:30.440  INFO 2444 --- [           main] o.apache.catalina.core.StandardService   : Stopping service Tomcat
2017-02-13 16:25:30.480  INFO 2444 --- [           main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2017-02-13 16:25:30.485 ERROR 2444 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:754) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]

What am I doing wrong?


Solution

  • Avro is an optional dependency of Spring Cloud Stream Schema (as the intent is to support other formats in the future. In order to activate schema support, you should simply add

     <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
         <version>1.8.1</version>
     </dependency>
    

    to the project.