Search code examples
node.jspromisecassandraconsumerkafka-node

using Kafka Consumer in Node JS app to indicate computations have been made


So my question may involve some brainstorming based on the nature of the application.

I have a Node JS app that sends messages to Kafka. For example, every single time a user clicks on a page, a Kafka app runs a computation based on the visit. I then at the same instance want to retrieve this computation after triggering it through my Kafka message. So far, this computation is stored in a Cassandra database. The problem is that, if we try to read from Cassandra before the computation is complete then we will query nothing from the database(key has not been inserted yet)and won't return anything(error), or possibly the computation is stale. This is my code so far.

router.get('/:slug', async (req, res) =>{

Producer = kafka.Producer


KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()



producer = new Producer(client)



km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.session_id.toString(), url: arbitrary_url })
  payloads = [
    { topic: 'MakeComputationTopic', messages: kafka_message}
  ]; 
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],
localDataCenter: 'datacenter1', // here is the change required
keyspace: 'computation_space',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra')
});



const query = 'SELECT * FROM computation  WHERE id = ?';




clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with email %s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });


}

Firstly, async and await came to mind but that is ruled out since this does not stop stale computations.

Secondly, I looked into letting my application sleep, but it seems that this way will slow my application down.

I am possibly deciding on using Kafka Consumer (in my node-js) to consume a message that indicates that it's safe to now look into the Cassandra table.

For e.g. (using kafka-node)

consumer.on('message', function (message) {
    clientCass.execute(query, [req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with computation%s', result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });
}); 

This approach while better seems a bit off since I will have to make a consumer every time a user clicks on a page, and I only care about it being sent 1 message.

I was wondering how I should deal with this challenge? Am I possibly missing a scenario, or is there a way to use kafka-node to solve this problem? I was also thinking of doing a while loop that waits for the promise to succeed and that computations are not stale(compare values in the cache)


Solution

  • This approach while better seems a bit off since I will have to make a consumer every time a user clicks on a page, and I only care about it being sent 1 message.

    I would come to the same conclusion. Cassandra is not designed for these kind of use cases. The database is eventually consistence. Your current approach maybe works at the moment, if you hack something together, but will definitely result in undefined behavior once you have a Cassandra cluster. Especially when you update the entry.

    The id in the computation table is your partition key. This means once you have a cluster Cassandra distributes the data by the id. It looks like it only contains one row. This is a very inefficient way of modeling your Cassandra tables.

    Your use case looks like one for a session storage or cache. Redis or LevelDB are well suited for these kind of use cases. Any other key value storage would do the job too.

    Why don't you write your result into another topic and have another application which reads this topic and writes the result into a database. So that you don't need to keep any state. The result will be in the topic when it is done. It would look like this:

    incoming data -> first kafka topic -> computational app -> second kafka topic -> another app writing it into the database <- another app reading regularly the data.

    If it is there it is there and therefore not done yet.