I am using the latest Spring Boot 3.4.1 and Spring Kafka 3.3.1.
I would like to create a new Kafka topic during the startup as it is mentioned in the official doc here, but the TopicBuilder
is trying to use a wrong configuration with default values like bootstrap.servers = [localhost:9092]
and it fails to create the topic.
After the startup when I send a message to the topic everything works great and the KafkaTemplate
uses my configuration.
Why TopicBuilder
uses different Kafka configuration then the KafkaTemplate
? What is wrong with my code?
I see there is a configs
method on the TopicBuilder
here, but it expects Map<String,String>
and I am not able to reuse the config map that I use for DefaultKafkaProducerFactory
which is Map<String, Object>
.
configuration:
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfiguration {
public static final String KAFKA_TOPIC = "topic1";
@Value("${KAFKA_SERVERS}")
private String bootstrapAddresses;
@Bean
public ProducerFactory<String, Event> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", configurationToString(factory));
return new KafkaTemplate<>(factory);
}
@Bean
public NewTopic topic() {
log.debug("creating a new topic kafka topic: {}", KAFKA_TOPIC);
return TopicBuilder.name(KAFKA_TOPIC)
.partitions(1)
.replicas(1)
.build();
}
private String configurationToString(ProducerFactory<String, Event> producerFactory) {
var sb = new StringBuilder();
producerFactory.
getConfigurationProperties().
forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\"", key, value)));
return sb.toString();
}
}
sender:
@Slf4j @RestController @RequestMapping("/api/kafka") public class KafkaProducerController {
private final KafkaTemplate<String, Event> kafkaTemplate;
public KafkaProducerController(KafkaTemplate<String, Event> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/send")
@MethodStatistics
public String sendMessageToKafka() {
var event = Event.builder().....build();
kafkaTemplate.send(KafkaProducerConfiguration.KAFKA_TOPIC, event);
return "Message has been sent to Kafka topic.";
}
log:
2025-01-16T15:29:04.448Z INFO 206 --- [my-kafka-producer] [ main] c.r.g.s.message.producer.Application : Starting Application v0.1.0 using Java 21.0.5 with PID 206 (/jar-to-run/remal-my-kafka-producer-0.1.0.jar started by root in /jar-to-run)
2025-01-16T15:29:04.450Z DEBUG 206 --- [my-kafka-producer] [ main] c.r.g.s.message.producer.Application : Running with Spring Boot v3.4.1, Spring v6.2.1
2025-01-16T15:29:04.450Z INFO 206 --- [my-kafka-producer] [ main] c.r.g.s.message.producer.Application : No active profile set, falling back to 1 default profile: "default"
2025-01-16T15:29:06.204Z INFO 206 --- [my-kafka-producer] [ main] o.s.cloud.context.scope.GenericScope : BeanFactory id=bc87c8e9-04ef-3afc-981b-5d032d24e596
2025-01-16T15:29:07.358Z INFO 206 --- [my-kafka-producer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8443 (https)
2025-01-16T15:29:07.378Z INFO 206 --- [my-kafka-producer] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2025-01-16T15:29:07.378Z INFO 206 --- [my-kafka-producer] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.34]
2025-01-16T15:29:07.416Z INFO 206 --- [my-kafka-producer] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2025-01-16T15:29:07.416Z INFO 206 --- [my-kafka-producer] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2918 ms
>>>>> 2025-01-16T15:29:08.062Z DEBUG 206 --- [my-kafka-producer] [ main] c.r.g.s.m.p.c.KafkaProducerConfiguration : initializing a KafkaTemplate using the following setting: {"bootstrap.servers": "kafka-1.hello.com:9092, kafka-2.hello.com:9092""key.serializer": "class org.apache.kafka.common.serialization.StringSerializer""value.serializer": "class org.springframework.kafka.support.serializer.JsonSerializer"}
>>>>> 2025-01-16T15:29:08.252Z DEBUG 206 --- [my-kafka-producer] [ main] c.r.g.s.m.p.c.KafkaProducerConfiguration : creating a new topic kafka topic: topic1
2025-01-16T15:29:10.119Z WARN 206 --- [my-kafka-producer] [ main] iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it's recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath.
2025-01-16T15:29:10.128Z INFO 206 --- [my-kafka-producer] [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 3 endpoints beneath base path '/actuator'
2025-01-16T15:29:10.225Z INFO 206 --- [my-kafka-producer] [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.controllers = []
>>>>> bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = my-kafka-producer-admin-0
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
enable.metrics.push = true
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2025-01-16T15:29:10.426Z INFO 206 --- [my-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1
2025-01-16T15:29:10.427Z INFO 206 --- [my-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:29:10.427Z INFO 206 --- [my-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1737041350424
2025-01-16T15:29:10.460Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.463Z WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:10.566Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.566Z WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:40.314Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:40.315Z WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:40.432Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed
>>>>> 2025-01-16T15:29:40.448Z ERROR 206 --- [my-kafka-producer] [ main] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics
at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:571) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at java.base/java.util.HashMap.forEach(HashMap.java:1429) ~[na:na]
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1707) ~[na:na]
at org.springframework.kafka.core.KafkaAdmin.checkPartitions(KafkaAdmin.java:550) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:443) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:276) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:245) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:1057) ~[spring-beans-6.2.1.jar!/:6.2.1]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:987) ~[spring-context-6.2.1.jar!/:6.2.1]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.2.1.jar!/:6.2.1]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.1.jar!/:3.4.1]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.1.jar!/:3.4.1]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.1.jar!/:3.4.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.1.jar!/:3.4.1]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:149) ~[spring-boot-3.4.1.jar!/:3.4.1]
at com.remal.my.service.message.producer.Application.main(Application.java:20) ~[!/:0.1.0]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.8.1.jar!/:na]
at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:553) ~[spring-kafka-3.3.1.jar!/:3.3.1]
... 20 common frames omitted
2025-01-16T15:29:41.253Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:41.254Z WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:50.450Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient : [AdminClient clientId=my-kafka-producer-admin-0] Forcing a hard I/O thread shutdown. Requests in progress will be aborted.
2025-01-16T15:29:50.451Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for my-kafka-producer-admin-0 unregistered
2025-01-16T15:29:50.452Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata
2025-01-16T15:29:50.452Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient : [AdminClient clientId=my-kafka-producer-admin-0] Timed out 2 remaining operation(s) during close.
2025-01-16T15:29:50.456Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2025-01-16T15:29:50.456Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2025-01-16T15:29:50.456Z INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2025-01-16T15:29:50.523Z INFO 206 --- [my-kafka-producer] [ main] o.a.t.util.net.NioEndpoint.certificate : Connector [https-jsse-nio-8443], TLS virtual host [_default_], certificate type [UNDEFINED] configured from keystore [/root/.keystore] using alias [kafka-producer-service-1.hello.com] with trust store [null]
2025-01-16T15:29:50.532Z INFO 206 --- [my-kafka-producer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8443 (https) with context path '/'
2025-01-16T15:29:50.539Z INFO 206 --- [my-kafka-producer] [ main] o.s.c.c.s.ConsulServiceRegistry : Registering service with consul: NewService{id='my-kafka-producer', name='my-kafka-producer', tags=[service, 0.1.0], address='kafka-producer-service-1.hello.com', meta={secure=true}, port=8443, enableTagOverride=null, check=Check{script='null', dockerContainerID='null', shell='null', interval='2s', ttl='null', http='https://kafka-producer-service-1.hello.com:8443/actuator/health', method='null', header={}, tcp='null', timeout='2s', deregisterCriticalServiceAfter='10s', tlsSkipVerify=null, status='null', grpc='null', grpcUseTLS=null}, checks=null}
2025-01-16T15:29:50.580Z INFO 206 --- [my-kafka-producer] [ main] c.r.g.s.message.producer.Application : Started Application in 51.927 seconds (process running for 52.723)
2025-01-16T15:29:50.636Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-6] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
{"status":"UP"}
>>>>> 2025-01-16T15:32:14.748Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect : > calling the KafkaProducerController.sendMessageToKafka()...
2025-01-16T15:32:14.769Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
>>>>> bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = my-kafka-producer-producer-1
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2025-01-16T15:32:14.826Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2025-01-16T15:32:14.852Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.KafkaProducer : [Producer clientId=my-kafka-producer-producer-1] Instantiated an idempotent producer.
2025-01-16T15:32:14.884Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1
2025-01-16T15:32:14.884Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:32:14.885Z INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1737041534884
2025-01-16T15:32:15.448Z WARN 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=my-kafka-producer-producer-1] Error while fetching metadata with correlation id 2 : {topic1=LEADER_NOT_AVAILABLE}
2025-01-16T15:32:15.450Z INFO 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=my-kafka-producer-producer-1] Cluster ID: M_1qzbN3Q4e2C_SPkTcrhQ
2025-01-16T15:32:15.450Z INFO 206 --- [my-kafka-producer] [ucer-producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=my-kafka-producer-producer-1] ProducerId set to 0 with epoch 0
>>>>> 2025-01-16T15:32:15.598Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect : < call ended: {name: KafkaProducerController.sendMessageToKafka, arguments: , return: "Message has been sent to Kafka topic.", execution-in-ms: 851}
If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. Add the following code snippet that can use the same configuration to create the topic and kafka template also use the same
@Bean public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}