Search code examples
javascriptnode.jsapache-kafkasocket.iokafkajs

How do I connect Kafkajs with Socket.io


I've been studying kafkajs and socket.io I'm am very new to it and i cant seem to understand some things. I have created a chat application that basically by opening a browser(client) you can type messages and they get displayed in a chat-window. I found a tutorial that makes kafka print "this message + i". I want to instead of sending to the topic and printing message+i to print what people type in chat and I'm not sure how I'm supposed to do that.

This is my consumer.js:

const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"

const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })

const consume = async () => {
    // first, we wait for the client to connect and subscribe to the given topic
    await consumer.connect()
    await consumer.subscribe({ topic })
    await consumer.run({
        // this function is called every time the consumer gets a new message
        eachMessage: ({ message }) => {
            // here, we just log the message to the standard output
            console.log(`received message: ${message.value}`)
        },
    })
}

module.exports = consume

This is my producer.js:

// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")

// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"

// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()

// we define an async function that writes a new message each second
const produce = async () => {
    await producer.connect()
    let i = 0

    // after the produce has connected, we start an interval timer
    setInterval(async () => {
        try {
            // send a message to the configured topic with
            // the key and value formed from the current value of `i`
            await producer.send({
                topic,
                messages: [
                    {
                        key: String(i),
                        value: "this is message " + i,
                    },
                ],
            })

            // if the message is written successfully, log it and increment `i`
            console.log("writes: ", i)
            i++
        } catch (err) {
            console.error("could not write message " + err)
        }
    }, 1000)
}

module.exports = produce

I know I'm supposed to somehow connect the topics brokers and clients with the socket.io but I'm not sure how.

Here is my chat.js:

/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');

// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');

// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
    socket.emit('chat', {
        message: message.value,
        username: username.value
    });
    message.value = "";
});

message.addEventListener('keypress', function(){
    socket.emit('typing', username.value);
})

// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
    feedback.innerHTML = '';
    output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
});

socket.on('typing', function(data){
    feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
});


Solution

  • You'll need a socket.io server.

    Example:

    const consume = require('consumer.js');
    const produce = require('producer.js');
    const { Server } = require('socket.io');
    const io = new Server();
    
    consume(({ from, to, message }) => {
      io.sockets.emit('newMessage', { from, to, message });
    })
    
    io.on('connection', function(socket) {
      socket.emit('Hi!', { message: 'Chat connected', id: socket.id });
      
      socket.on('sendMessage', ({ message, to }) => {
         produce({ from: socket.id, to, message });
      });
      
    });
    

    You also need to modify your consumer & producer to accept parameters and callbacks.

    Consumer Example:

    ...
    const consume = async cb => {
        // first, we wait for the client to connect and subscribe to the given topic
        await consumer.connect()
        await consumer.subscribe({ topic })
        await consumer.run({
            // this function is called every time the consumer gets a new message
            eachMessage: ({ from, to, message }) => {
                cb({ from, to, message });
            },
        });
    }
    

    Producer Example:

    const produce = async ({ from, to, message }) => {
        producer.send(topic, { from, to, message });
    }
    

    Don't forget to modify your chat.js on the client side

    All of this can be optimized and is just a brief example