Search code examples
micronautmicronaut-kafka

Micronaut 3+ and Kafka integration


I'm currently trying to get familiar with micronaut and Kafka. I've followed the documentation and created one KafkaClient in a microservice and one KafkaListener in a different microservice. The code looks like this:

Client. This is working as it's supposed to. I'm posting new messages in the vet-reviews topic:

@KafkaClient
public interface VetReviewClient {

    @Topic("vet-reviews")
    void send(VetReviewApi vetReviewApi);

}

Listener:

@KafkaListener(groupId = "pet-clinic", offsetReset = OffsetReset.EARLIEST)
@AllArgsConstructor
@Slf4j
public class VetReviewListener {

    private VetService vetService;

    @Topic("vet-reviews")
    public void receive(@MessageBody VetReviewApi vetReviewApi) {
        log.info("Received vet review: {}", vetReviewApi);

        vetService.processVetReview(vetReviewApi);
    }

}

Although posting the message in the topic, the listener does not pull the new message. I've followed the official micronaut-kafka integration documentation, but it does not work.

I know that this question was posted a while ago, but I don't see any answer and I'm unable to "revive" the problem by commenting because I don't have sufficient reputation.

Link to old post


Solution

  • Update on the problem. I've changed the Kafka docker image and that was it. Current working kafka-zookeper-kafdrop config:

     zookeeper:
        image: confluentinc/cp-zookeeper:7.0.1
        container_name: zookeeper
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: confluentinc/cp-kafka:7.0.1
        container_name: kafka
        ports:
          # To learn about configuring Kafka for access across networks see
          # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
          - "9092:9092"
        depends_on:
          - zookeeper
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    
      kafdrop:
        image: obsidiandynamics/kafdrop
        ports:
          - 9100:9000
        environment:
          - KAFKA_BROKERCONNECT=kafka:29092
          - JVM_OPTS=-Xms32M -Xmx64M
        depends_on:
          - kafka