I am implementing a stream app on Spring Cloud Dataflow. I would like to use Avro based schema registry client for serialization and schema control. My basic goal is to feed Source app with some external data, transform it to prepared avro-based schema and send it to a Sink app which will accept only this schema. I would like to use the schema from external schema registry server, not the file version of the schema. My code looks like:
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class DisSampleSource {
private final DisSampleSourceProperties properties;
@Inject
public DisSampleSource(DisSampleSourceProperties properties) {
this.properties = properties;
}
@InboundChannelAdapter(Source.OUTPUT)
public String feed() throws IOException {
if (!Paths.get(properties.getPath()).toFile().exists()) {
throw new InvalidPathException(this.properties.getPath(),
"The file does not exists or is of not proper type.");
}
return new String(Files.readAllBytes(Paths.get(properties.getPath())), StandardCharsets.UTF_8);
}
}
POM:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
During starting application I am passing property:
--spring.cloud.stream.bindings.output.contentType=application/foo.bar.v1+avro
At the moment, the application fails to start with following exception:
2017-02-13 16:25:30.430 WARN 2444 --- [ main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
2017-02-13 16:25:30.440 INFO 2444 --- [ main] o.apache.catalina.core.StandardService : Stopping service Tomcat
2017-02-13 16:25:30.480 INFO 2444 --- [ main] utoConfigurationReportLoggingInitializer :
Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2017-02-13 16:25:30.485 ERROR 2444 --- [ main] o.s.boot.SpringApplication : Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'disSampleSource' defined in URL [jar:file:/D:/git/dis/sample/source/target/dis-sample-source-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/atsisa/bit/dis/sample/DisSampleSource.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.cloud.stream.config.ChannelBindingAutoConfiguration': Unsatisfied dependency expressed through field 'adapters'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.stream.messaging.Source': Invocation of init method failed; nested exception is org.springframework.cloud.stream.converter.ConversionException: No message converter is registered for application/foo.bar.v1+avro
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:754) ~[spring-beans-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.4.RELEASE.jar!/:4.3.4.RELEASE]
What am I doing wrong?
Avro
is an optional dependency of Spring Cloud Stream Schema (as the intent is to support other formats in the future. In order to activate schema support, you should simply add
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
to the project.