Search code examples
springspring-bootspring-kafka

Spring Boot Kafka Streams - Binding Issue


I am trying build out a simple streams app based on Kafka Streams using this example.

Word Count

However when I am starting the app, I get the below error: Can someone please point out on what I am missing out here? Here is the code, config & error

@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableBinding(PersonBinding.class)
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }

  @Component
  public static class PersonSource {

    private final MessageChannel personOut;

    @Autowired
    PersonSource(PersonBinding personBinding) {

      this.personOut = personBinding.personOut();
    }

    @Scheduled(fixedDelay = 5000L)
    public void run() {

      Message<Person> message = MessageBuilder
          .withPayload(new Person("John", "Doe", Instant.now()))
          .build();

      try {

        personOut.send(message);

        log.info("Published message: {}", message);
      } catch (Exception e) {

        e.printStackTrace();
        throw e;
      }
    }
  }

  @Component
  public static class PersonProcessor {

    @StreamListener
    public void process(@Input(PersonBinding.PERSON_IN) KStream<String, Person> events) {

      events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
    }
  }
}

@Data
@AllArgsConstructor
class Person {

  String firstName;

  String lastName;

  Instant createdOn;
}

interface PersonBinding {

  String PERSON_IN = "pin";

  String PERSON_OUT = "pout";

  @Output(PERSON_OUT)
  MessageChannel personOut();

  @Input(PERSON_IN)
  KStream<String, Person> personIn();
}

Dependency Management (Spring Boot 1.5.13.RELEASE)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>

Configuration

# Default Configuration
spring.cloud.stream.kstream.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Out Bindings Configuration
spring.cloud.stream.bindings.pout.destination=pout
spring.cloud.stream.bindings.pout.producer.header-mode=raw
# In Bindings Configuration
spring.cloud.stream.bindings.pin.destination=pout
spring.cloud.stream.bindings.pin.consumer.header-mode=raw

Error

Field configurationProperties in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration required a single bean, but 2 were found:
  - spring.cloud.stream.kafka.binder-org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties: a programmatically registered singleton - binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.class]


Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

** EDIT 1 **

Uploaded code to Github https://github.com/tapitoe/demo-spring-cloud-streams/tree/master/src


Solution

  • You need to add the kstream binder to the pom; the starter only adds the message channel binder.

    EDIT

    I just copied similar code into an app with no problems.

    @SpringBootApplication
    @EnableBinding(So50693858Application.PersonBinding.class)
    public class So50693858Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50693858Application.class, args);
        }
    
        @StreamListener
        public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {
    
            events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
        }
    
        interface PersonBinding {
    
            String PERSON_IN = "pin";
    
            String PERSON_OUT = "pout";
    
            @Output(PERSON_OUT)
            MessageChannel personOut();
    
            @Input(PERSON_IN)
            KStream<String, String> personIn();
        }
    
    }
    

    and sent a message from the console producer to pout and

    Key: null; Value: foo
    

    It's not clear, however, why you have input and output bindings to the same destination (not that that would cause the problem you see).

    EDIT

    This works too (with your properties):

    @SpringBootApplication
    @EnableBinding(So50693858Application.PersonBinding.class)
    public class So50693858Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50693858Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(MessageChannel pout) {
            return args -> {
                pout.send(new GenericMessage<>("foo".getBytes(),
                        Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())));
                pout.send(new GenericMessage<>("baz".getBytes(),
                        Collections.singletonMap(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())));
            };
        }
    
        @StreamListener
        public void process(@Input(PersonBinding.PERSON_IN) KStream<String, String> events) {
    
            events.foreach(((key, value) -> System.out.println("Key: " + key + "; Value: " + value)));
        }
    
        interface PersonBinding {
    
            String PERSON_IN = "pin";
    
            String PERSON_OUT = "pout";
    
            @Output(PERSON_OUT)
            MessageChannel personOut();
    
            @Input(PERSON_IN)
            KStream<String, String> personIn();
        }
    
    }
    

    and

    Key: bar; Value: foo
    Key: qux; Value: baz
    

    EDIT3

    Pom for 2.0.x version:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>so50693858</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>so50693858</name>
        <description>Demo project for Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <spring-cloud.version>Finchley.RC2</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-test-support</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
        <repositories>
            <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
    
    
    </project>
    

    config:

    # Default Configuration
    spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
    spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
    # Out Bindings Configuration
    spring.cloud.stream.bindings.pout.destination=pout
    spring.cloud.stream.bindings.pout.producer.header-mode=raw
    # In Bindings Configuration
    spring.cloud.stream.bindings.pin.destination=pout
    spring.cloud.stream.bindings.pin.consumer.header-mode=raw