I must setup a group id for the kafka stream consumer, that matches a strict naming convention.
I cannot find a way that works after having followed deeply the documentation. As I still believe that I may have misundersood something, I prefer to open a question here for peer-review before opening a bug issue on spring-cloud-stream github repository.
A similar question was already asked one year ago, but the question is not very exaustive and not answered yet, I hope that I can give more insight to the problem here.
From several sources of the official documentation, I see that this should be pretty easy to configure in application.yaml
of my app.
The documentation states that I can either:
spring.cloud.stream.kafka.default.group=<value>
spring.cloud.stream.bindings.<channelName>.group
If I setup directly the kafka generic field group-id
in spring.kafka.consumer.group-id
the parameter is explicitely ignored and I get the following WARN
:
2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id
so I have also tried in both the sections spring.cloud.stream.default.group
and spring.cloud.stream.binding.<name>.group
(note that it is stated here binding
and not bindings
, without s
).
Edit: Based on a comment from @OlegZhurakousky, this is only a typo in the error message. I tested with and without the s
without success.
I have had a quick look at the stream code, and this property seems indeed the one that must be set, such as they are doing in their tests, we can see that they use for example: --spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup
.
The group ID seems always ignored, after testing all the afore mentioned configuration. The group is always set to the default value, which is groupId=process-applicationId
.
such as in the logs as follow:
2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
It is like the application.yaml
for group is not used at all. On the other hand, the spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic
field that set destination: my-custom-topic
is understood and the topic is followed correctly (see the logs above).
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.2.4</version>
</dependency>
package my.custom.stuff;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class myKafkaStreamConsumer {
private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);
@Bean
public static Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
logger.debug("from STREAM: Key= {} , value = {}", key, value);
// ...
// my message handling business logic
// ...
});
}
}
I put here the version of the application.yaml that IMHO should be the most compliant with the documentation and still is not working, note that the destination
is correctly used, so at least it is using the correct channel.
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
cloud:
stream:
bindings:
process-in-0:
group: myCustomGroupId
destination: "my-custom-topic"
I have tried to inject the group id in several ways, that include:
consumer
subsection such as in spring.cloud.stream.bindings.process-in-0.consumer.group
or spring.cloud.stream.bindings.process-in-0.consumer.group-id
It simply seems always ignored.
Bit of a disclaimer, I'm a bit rusty on Spring but since I've been working with Kafka for the past couple of months I wanted to play with this too. I got it to work by doing two things:
use applicationId
instead of group
within the application properties
spring:
kafka:
bootstrap-servers: localhost:29092
consumer:
auto-offset-reset: earliest
cloud:
stream:
kafka:
binder:
functions:
process:
applicationId: MyGroupIdUsingApplicationId
bindings:
process-in-0:
bindings:
process-in-0:
destination: my-custom-topic
explicitly declare a KafkaBinderConfigurationProperties
bean
I created a working sample here for you to clone and test with if you need to: https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId
Edit:
Just to add that I was focused on just seeing that the GroupId can be set and that it registers correctly, whether using the applicationId
property is correct and what side effects there are, I haven't looked into it.