Search code examples
node.jsdockerapache-kafkanestjs

connection error: connect ECONNREFUSED 127.0.0.1:9092 , how to solve kafka error?


Well, I'm trying to upload to a Kafka server, but something is wrong and I don't understand the error logs, I want my producer to produce and consumer when it's the exact topic, consume and send data that I still just want to log, but then I'll use to search for users, my codes:

kafka-producer

export class ProducerS {
  private kafkaProducer: KafkaProducer;
  constructor() {
    // Configuração do Kafka com os endereços dos brokers
    const kafka = new Kafka({clientId: 'kafkaP', brokers: ['localhost:9092'] });
    // Criação de um produtor Kafka
    this.kafkaProducer = kafka.producer()
  }

  // Método para conectar ao broker Kafka
  async connect(): Promise<void> {
    console.log('Conectou')
    await this.kafkaProducer.connect();
  }

  // Método para desconectar do broker Kafka
  async disconnect(): Promise<void> {
    console.log('Desconectou')
    await this.kafkaProducer.disconnect();
  }

  // Método para enviar uma mensagem para um tópico específico
  async sendM(topic: string, messages: any): Promise<void> {
    // Envia a mensagem para o tópico especificado
    await this.kafkaProducer.send({
      topic,
      messages: [{ value: JSON.stringify(messages) }],
    });
  }
}

kafka-consumer:

export class PagamentoService {
    private kafka: Kafka;
    private consumer: Consumer;

    constructor(
        @InjectRepository(DetalheEntidade)
        private readonly detalheRepository: Repository<DetalheEntidade>,
    ){ 
        this.kafka = new Kafka({
            clientId: 'kafkaP',
            brokers: ['localhost:9092'],
        });
        this.consumer = this.kafka.consumer({groupId: 'pagamentos-group'});
        
    } 

    async pegarConsumer(){
        await this.consumer.connect()
        console.log('Opa amigao')
        await this.consumer.subscribe({topic: 'criar-detalhe'})
        await this.consumer.run({
            // eslint-disable-next-line @typescript-eslint/no-unused-vars
            eachMessage: async ({topic, partition, message}) => {
                const ids = await JSON.parse(message.value.toString())
                const topicS = await JSON.parse(topic.toString())
                console.log(ids)
                console.log(topicS)
            }
        })
    }

}

docker-compose(maybe here, idk):

version: '3.5'
services:
  kafka:
    build:
      context: ./
      dockerfile: ./apps/kafka/Dockerfile
    env_file:
      - .env
    depends_on:
      - postgres
    volumes:
      - .:/usr/src/app 
      - /usr/src/app/node_modules
    command: npm run start:dev kafka 
    
  kafkaprodute:
    build: 
      context: ./
      dockerfile: ./apps/kafkaprodute/Dockerfile
    ports:
      - '4000:5000'
    env_file:
      - .env
    depends_on:
      - kafka
    volumes:
      - .:/usr/src/app 
      - /usr/src/app/node_modules
    command: npm run start:dev kafkaprodute 
    
  postgres:
    image: postgres
    env_file:
      - .env
    ports:
      - '5432:5432'
    volumes:
      - ./db/data:/var/lib/postgresql/data

  postgres_admin:
    image: dpage/pgadmin4
    depends_on:
      - postgres
    env_file:
      - .env
    ports:
      - '15432:80'

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - '2181:2181'

  kafkaServer:
    image: wurstmeister/kafka
    container_name: kafkaServer
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

where I use producer:


    const usuarioSalvo = usuarioEntidade.id
    
    const kafkaGerent = new ProducerS()
    await kafkaGerent.connect()
    await kafkaGerent.sendM(
      'criar-detalhe', {value: JSON.stringify({usuarioSalvo})}
    )
    await kafkaGerent.disconnect()

the logs error:

ERROR [ServerKafka] ERROR [Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.624Z","logger":"kafkajs","broker":"localhost:9092","clientId":"nestjs-consumer-server","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1605:16)"}
kafka-1           | [Nest] 29  - 04/24/2024, 3:08:35 PM   ERROR [ServerKafka] ERROR [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.625Z","logger":"kafkajs","retryCount":4,"retryTime":4152}

what I did wrong?


Solution

  • As you're using docker compose to run both the NestJS microservice consumer and the kafka server itself, you should use the host kafkaServer instead of localhost, as the docker network will have the service's name set up in the DNS resolution and each container will have its own separatelocalhost