Search code examples
migrationspring-kafkaspring-cloud-stream-binder-kafkaspring-cloud-functionjava-20

Spring cloud Stream function not recognized


I have a Spring Boot Kafka streaming application. After upgrading to SB3, the spring.cloud.function.definition is not recognized anymore.

I enabled debug logs and was of help. I see these messages in logs:

Multiple functional beans were found [myEvents, sendToDlqAndContinue], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it. 

This is my application.yml:

spring:
  application:
    name: @project.artifactId@
  cloud:
    config:
      name: my-app
    stream:
      default:
        producer:
          useNativeEncoding: true
      function:
        definition:myEvents
        ineligible-definitions: sendToDlqAndContinue

These are the dependencies I have:


<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>3.1.5</version>
    </parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
</dependencies>

I have tried debugging the Cloud stream Kafka classe which is reporting the error, BeanFactoryAwareFuntionRegistry.java

and found that this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, ""); is ""

It does contain ineligible-definitions I have in yml, but not the definition

Any clues are highly appreciated. Maybe there are some changes in SpringBoot 3 related to cloud streams? Any missing dependency?

Update

I tried to run the app after adding the dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-properties-migrator</artifactId>
    <scope>runtime</scope>
</dependency>

The application now consumes messages, but the dependency does not log the unsupported or migrated properties. How can I know which property is removed/migrated??

I find no official documentation about changes in new release.


Solution

  • I was changed spring.cloud.stream.function to spring.cloud.function, it works for me.

    TO-BE application.yml

    spring:
      application:
        name: @project.artifactId@
      cloud:
        config:
          name: my-app
        function:
          definition:myEvents
          ineligible-definitions: sendToDlqAndContinue
        stream:
          default:
            producer:
              useNativeEncoding: true
    

    But, Official Document says both of configuration works.

    To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.stream.function.definition or native to spring-cloud-function spring.cloud.function.definition property.

    And, you should be check this url, it works for me.

    I changed class to function.

    I hope this helps.