Search code examples
javaspring-cloud-streamspring-cloud-stream-binder-kafka

Configured group ID is ignored in spring-cloud-streams


The goal

I must setup a group id for the kafka stream consumer, that matches a strict naming convention.

I cannot find a way that works after having followed deeply the documentation. As I still believe that I may have misundersood something, I prefer to open a question here for peer-review before opening a bug issue on spring-cloud-stream github repository.

NB:

A similar question was already asked one year ago, but the question is not very exaustive and not answered yet, I hope that I can give more insight to the problem here.

What the official documentation states (and also based on WARN messages)

From several sources of the official documentation, I see that this should be pretty easy to configure in application.yaml of my app.

The documentation states that I can either:

  • use a default value for all the binders, using the section spring.cloud.stream.kafka.default.group=<value>
  • or use a specific value for my channel in spring.cloud.stream.bindings.<channelName>.group

If I setup directly the kafka generic field group-id in spring.kafka.consumer.group-id the parameter is explicitely ignored and I get the following WARN:

2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id

so I have also tried in both the sections spring.cloud.stream.default.group and spring.cloud.stream.binding.<name>.group (note that it is stated here binding and not bindings, without s).

Edit: Based on a comment from @OlegZhurakousky, this is only a typo in the error message. I tested with and without the s without success.

Looking at the code of the library

I have had a quick look at the stream code, and this property seems indeed the one that must be set, such as they are doing in their tests, we can see that they use for example: --spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup .

The problem after following the documentation

The group ID seems always ignored, after testing all the afore mentioned configuration. The group is always set to the default value, which is groupId=process-applicationId.

such as in the logs as follow:

2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update

It is like the application.yaml for group is not used at all. On the other hand, the spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic field that set destination: my-custom-topic is understood and the topic is followed correctly (see the logs above).

How my application is setup

relevant dependencies in pom.xml

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>3.2.4</version>
        </dependency>

kakfa stream consumer class (simplified to include only the relevant sections)


package my.custom.stuff;


import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class myKafkaStreamConsumer {

    private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);

    @Bean
    public static Consumer<KStream<String, String>> process() {
        return input ->
                input.foreach((key, value) -> {
                    logger.debug("from STREAM: Key= {} , value = {}", key, value);
                    // ...
                    // my message handling business logic
                    // ...
                });
    }
}

one version of the application.yaml

I put here the version of the application.yaml that IMHO should be the most compliant with the documentation and still is not working, note that the destination is correctly used, so at least it is using the correct channel.

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
  cloud:
    stream:
      bindings:
        process-in-0:
          group: myCustomGroupId
          destination: "my-custom-topic"

What I have already tested (unsuccessfully)

I have tried to inject the group id in several ways, that include:

  • all the possible combinations that I could find in any official documentation or example
  • adding it in the consumer subsection such as in spring.cloud.stream.bindings.process-in-0.consumer.group or spring.cloud.stream.bindings.process-in-0.consumer.group-id
  • injecting the official documented keys as environment variables

It simply seems always ignored.

References


Solution

  • Bit of a disclaimer, I'm a bit rusty on Spring but since I've been working with Kafka for the past couple of months I wanted to play with this too. I got it to work by doing two things:

    • use applicationId instead of group within the application properties

      spring:
        kafka:
          bootstrap-servers: localhost:29092
          consumer:
            auto-offset-reset: earliest
        cloud:
          stream:
            kafka:
              binder:
                functions:
                  process:
                    applicationId: MyGroupIdUsingApplicationId
            bindings:
              process-in-0:
                bindings:
                  process-in-0:
                    destination: my-custom-topic
      
      
    • explicitly declare a KafkaBinderConfigurationProperties bean

    I created a working sample here for you to clone and test with if you need to: https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId

    Edit:

    Just to add that I was focused on just seeing that the GroupId can be set and that it registers correctly, whether using the applicationId property is correct and what side effects there are, I haven't looked into it.