I'm currently trying to get familiar with micronaut and Kafka. I've followed the documentation and created one KafkaClient in a microservice and one KafkaListener in a different microservice. The code looks like this:
Client. This is working as it's supposed to. I'm posting new messages in the vet-reviews topic:
@KafkaClient
public interface VetReviewClient {
@Topic("vet-reviews")
void send(VetReviewApi vetReviewApi);
}
Listener:
@KafkaListener(groupId = "pet-clinic", offsetReset = OffsetReset.EARLIEST)
@AllArgsConstructor
@Slf4j
public class VetReviewListener {
private VetService vetService;
@Topic("vet-reviews")
public void receive(@MessageBody VetReviewApi vetReviewApi) {
log.info("Received vet review: {}", vetReviewApi);
vetService.processVetReview(vetReviewApi);
}
}
Although posting the message in the topic, the listener does not pull the new message. I've followed the official micronaut-kafka integration documentation, but it does not work.
I know that this question was posted a while ago, but I don't see any answer and I'm unable to "revive" the problem by commenting because I don't have sufficient reputation.
Update on the problem. I've changed the Kafka docker image and that was it. Current working kafka-zookeper-kafdrop config:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafdrop:
image: obsidiandynamics/kafdrop
ports:
- 9100:9000
environment:
- KAFKA_BROKERCONNECT=kafka:29092
- JVM_OPTS=-Xms32M -Xmx64M
depends_on:
- kafka