Search code examples
apache-kafkanestkafkajs

Kafka wiith nestjs error: KafkaJSProtocolError: This server does not host this topic-partition


I have a kafka cluster that I run using docker compose:

broker1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker1
    container_name: broker1
    depends_on:
      - controller
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      

  controller:
    image: confluentinc/cp-kafka:7.4.0
    hostname: controller
    container_name: controller
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

notice that KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' which means kafka is configured to create topics automatically.

My nest client configuation:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092']
        }
      },
      consumer: {
        groupId: 'my-consumer-group',
        allowAutoTopicCreation: true
      }
    }
)

For consuming specific topic i use the @EventPattern decorator.

I expect that the application will create the topics that my consumer is consuming, but instead i get this error:

ERROR [Connection] Response Meadata(key: 3, version: 6) {"timestamp": "2023-08-09T10:20:17.462Z", "logger": "kafkajs", "broker": "localhost:9092", "clientId": "nestjs-consumer-server", "error": "This server does noy host this topic-partition", "correlationId": 7, "size": 371"}

KafkaJSProtocolError: This server does not host this topic-partition
....

An important thing to say is that the same code and configuration worked with a different kafka stack (I just do not have all the information about the specific image and version of the last stack)

I dont understand if the problem is with the kafka configuration, or if kafka is not allowing consumers to create topics automatically, or the problem is with nest?

Edit:

I discovered that the application does create two topics each time and then fails, so I think it means Kafka is configured correctly for auto topic creation and the problem is in nest application or kafkajs.


Solution

  • It is not a perfect solution but what worked for me at the end is to use Kafka with zookeeper instead of kraft, this solved the error and enabled the consumer to create all the topics automatically. But probably the better solution is to create the topics before the consumer tries to consume them.