Search code examples
javaspring-bootapache-kafkaspring-cloud-stream

Spring Cloud Stream Kafka consumer app, AdminClient is called for no reason


Small question regarding Spring Cloud Stream Kafka please.

I am having a very simple and straightforward consumer. It is consuming only, does not produce messages, the topic is already there, I do not need to create topic.

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        log.warn("Remember about calling <.subscribe()> at the end of your Consumer<Flux> bean!");
        log.warn("Remember about finishing the span manually before calling subscribe!");
    }

    @Bean
    Consumer<Flux<Message<String>>> channel(Tracer tracer, ObservationRegistry observationRegistry) {
        return flux -> flux.doOnNext(msg -> log.info("<ACCEPTANCE_TEST> <TRACE:{}> Hello from consumer",
                tracer.currentSpan().context().traceId())).subscribe();
    }

}

Yet on application startup, I am seeing interactions from the admin client please see logs below:

2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.i.c.PublishSubscribeChannel : Channel 'stream-reactive-consumer.errorChannel' has 1 subscriber(s).
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.c.s.binder.DefaultBinderFactory : Constructing binder child context for kafka
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: kafka
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.a.k.c.admin.AdminClientConfig : AdminClientConfig values:

[bunch of values...]

2023-02-06 10:25:24 [stream-reactive-consumer,,] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.3.1
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e23c59d00e687ff5
2023-02-06 10:25:24 [stream-reactive-consumer,,] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1675650324403

As you can see, the app is calling the admin client:

2023-02-06 10:25:24 [stream-reactive-consumer,,] o.a.k.c.admin.AdminClientConfig : AdminClientConfig values:

May I ask why this reliance on the admin for a consumer only app please? Again, I am not producing anything, not creating any topic.

Is there a way to consume without this admin client, disable this admin client please?

Thank you


Solution

  • As suggested by @Gary Russell, the bug is fixed and seeing Admin client on startup for a consumer only app is indeed not expected.

    In my case, it is due to a mix of couple of things:

    1 - How can I configure a Spring Cloud Stream (Kafka) application to autocreate the topics in Confluent Cloud?

    spring.cloud.stream.kafka.binder.autoCreateTopics property is set to true by default, this will "trigger the Admin Client"

    I did set it to false on my side, but because of this issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2644 when running on native, the property was not being taken into account.

    Making sure the property is false, and setting the breakpoint as Gary suggested is the correct solution (upvoted what he said)