Well, I'm trying to upload to a Kafka server, but something is wrong and I don't understand the error logs, I want my producer to produce and consumer when it's the exact topic, consume and send data that I still just want to log, but then I'll use to search for users, my codes:
kafka-producer
export class ProducerS {
private kafkaProducer: KafkaProducer;
constructor() {
// Configuração do Kafka com os endereços dos brokers
const kafka = new Kafka({clientId: 'kafkaP', brokers: ['localhost:9092'] });
// Criação de um produtor Kafka
this.kafkaProducer = kafka.producer()
}
// Método para conectar ao broker Kafka
async connect(): Promise<void> {
console.log('Conectou')
await this.kafkaProducer.connect();
}
// Método para desconectar do broker Kafka
async disconnect(): Promise<void> {
console.log('Desconectou')
await this.kafkaProducer.disconnect();
}
// Método para enviar uma mensagem para um tópico específico
async sendM(topic: string, messages: any): Promise<void> {
// Envia a mensagem para o tópico especificado
await this.kafkaProducer.send({
topic,
messages: [{ value: JSON.stringify(messages) }],
});
}
}
kafka-consumer:
export class PagamentoService {
private kafka: Kafka;
private consumer: Consumer;
constructor(
@InjectRepository(DetalheEntidade)
private readonly detalheRepository: Repository<DetalheEntidade>,
){
this.kafka = new Kafka({
clientId: 'kafkaP',
brokers: ['localhost:9092'],
});
this.consumer = this.kafka.consumer({groupId: 'pagamentos-group'});
}
async pegarConsumer(){
await this.consumer.connect()
console.log('Opa amigao')
await this.consumer.subscribe({topic: 'criar-detalhe'})
await this.consumer.run({
// eslint-disable-next-line @typescript-eslint/no-unused-vars
eachMessage: async ({topic, partition, message}) => {
const ids = await JSON.parse(message.value.toString())
const topicS = await JSON.parse(topic.toString())
console.log(ids)
console.log(topicS)
}
})
}
}
docker-compose(maybe here, idk):
version: '3.5'
services:
kafka:
build:
context: ./
dockerfile: ./apps/kafka/Dockerfile
env_file:
- .env
depends_on:
- postgres
volumes:
- .:/usr/src/app
- /usr/src/app/node_modules
command: npm run start:dev kafka
kafkaprodute:
build:
context: ./
dockerfile: ./apps/kafkaprodute/Dockerfile
ports:
- '4000:5000'
env_file:
- .env
depends_on:
- kafka
volumes:
- .:/usr/src/app
- /usr/src/app/node_modules
command: npm run start:dev kafkaprodute
postgres:
image: postgres
env_file:
- .env
ports:
- '5432:5432'
volumes:
- ./db/data:/var/lib/postgresql/data
postgres_admin:
image: dpage/pgadmin4
depends_on:
- postgres
env_file:
- .env
ports:
- '15432:80'
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- '2181:2181'
kafkaServer:
image: wurstmeister/kafka
container_name: kafkaServer
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
where I use producer:
const usuarioSalvo = usuarioEntidade.id
const kafkaGerent = new ProducerS()
await kafkaGerent.connect()
await kafkaGerent.sendM(
'criar-detalhe', {value: JSON.stringify({usuarioSalvo})}
)
await kafkaGerent.disconnect()
the logs error:
ERROR [ServerKafka] ERROR [Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.624Z","logger":"kafkajs","broker":"localhost:9092","clientId":"nestjs-consumer-server","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1605:16)"}
kafka-1 | [Nest] 29 - 04/24/2024, 3:08:35 PM ERROR [ServerKafka] ERROR [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092 {"timestamp":"2024-04-24T15:08:35.625Z","logger":"kafkajs","retryCount":4,"retryTime":4152}
what I did wrong?
As you're using docker compose to run both the NestJS microservice consumer and the kafka server itself, you should use the host kafkaServer
instead of localhost
, as the docker network will have the service's name set up in the DNS resolution and each container will have its own separatelocalhost