Search code examples
node.jspostgresqlkubernetesnode-postgreskafkajs

Question on designing a Kafka Consumer in node.js and making it idempotent


We are trying to write a consumer that would subscribe to a particular topic and do some processing when a couple conditions in the data are met. One operation in the processing however can be done only once, and for the purpose of simplicity let us consider that to be a POST http request that is not idempotent.

Following are some other considerations:-

  • The consumer would need to be coded in node.js and use the kafkajs library,
  • The consumer would need to run on Kubernetes,
  • The number of replicas would be equal to the number of producer replicas
  • We will use a consumer group for our consumers

We were thinking that to make this Consumer Idempotent, we can perhaps do something like:-

For Every Message
    Check if message was processed
    try:
        If !processed
            Do Processing (http POST included here)
    catch for errors:
            Do error processing
    finally:
        Mark message as processed

"Mark message as processed" would basically be capturing some details to a Relation DB (like Postgres), stuff like offset, timestamp, and a few other details to ensure we have the keys captured that would allow us to identify a record uniquely

  1. Does the above look good for making the Consumer Idempotent?
  2. What other alternatives can you suggest that would perform better?

Added to the above, a few more questions around the Best Practices with DB processing in the above scenario:-

  1. Say I have 3 k8s nodes and each one of them have 3 consumer pods running, essentially giving 9 single threaded kafka consumers. Is that a correct understanding?

  2. Now since each of these threads will do DB Inserts/Reads, what would be a better to use between a Pool and Client (assuming node-postgres library)?

  3. It seems that if we open a Client connection at the Start of Day and let it be there till the end of the day, it should work for us. Is that a good approach or a really poor design?

  4. Do we get any benefits by using Pools, if we are doing for each message processing from these 9 consumers running.

Additional Assumptions:-

  • Traffic Timing: Start at 7:00 AM Eastern and pick up over the day and taper in the US Evening. No traffic between 2:00 AM Eastern to 6:00 AM Eastern.
  • Average:- 1 message per second during US Daytime,
  • Max:- 5 messages per second in bursts of small duration during US daytime.
  • Delay Tolerance: On a normal day, the POST in the consumer can be delayed no more than 5 mins from the publish time of the message.

Thanks for being patient and reading this through. Apologize for the length of this post.


Solution

    1. Does the above look good for making the Consumer Idempotent?

    Yes, from the POV of Idempotency, your code looks good. Since you're working with Kafka consumers, you don't need an exclusive for loop for message processing. Consumers are invoked on each message arrival. Your psuedo-code should look like this:

    Check if message was processed
    try:
        If !processed
            Do Processing (http POST included here)
    catch for errors:
            Do error processing
    finally:
        Mark message as processed
    
    1. What other alternatives can you suggest that would perform better?

    Your code misses out on an important aspect which is concurrent duplicate messages. Say, for example, two messages are somehow produced at the same time from the producer (which is actually an error at producer's end) and the message should be processed only once. The consumer starts processing for both messages. At this point If !processed, both consumers see the same state which is not processed and both of them proceed to Do Processing. You can avoid such scenarios by acquiring a lock on some an id by which you can tell if a message is duplicate or not. Since you're already using Postgres, you could look into pg_advisory_locks. So now, your pseudo-code will now look something like:

    Check if message was processed
    try:
        acquire_lock(message.uniqId)    //pg_advisory_lock
        If !processed
            Do Processing (http POST included here)
    catch for errors:
        if error is lock_already_acquired_by_other_process
            return as duplicate processor
        else
            Do error processing
    finally:
        Mark message as processed
        release lock
    

    We can still do some improvements. The above code doesn't handle failure scenarios where we would like to have retries. There are various ways to achieve this. Oh wait, you're using Kafka. Why not publish the message which has failed in processing (obviously not those which were duplicates) in the same Kafka topic after some delay and have some counter in the message object to check how many times has this message been processed. We would certainly want to retry only a limited number of times, so each time we're processing the message we can check the counter we set previously in the message object to control the number of retries. So far so good, but what about the messages which are failing even after the fixed number of retries. For such cases, you'd like to have a DLQ (dead lettered queue) which holds such messages along with some error message until you have looked at them manually and fixed the errors.

    This sounds a lot of code to write. We've another great news. There are libraries available which you can leverage to achieve all these. One such library is bull.

    1. Say I have 3 k8s nodes and each one of them have 3 consumer pods running, essentially giving 9 single threaded kafka consumers. Is that a correct understanding?

    Yeah. As far as I understand this.

    1. Now since each of these threads will do DB Inserts/Reads, what would be a better to use between a Pool and Client (assuming node-postgres library)?

    Using a pool would be advisable since you'd also aim to achieve faster processing. With connection pooling you can do stuffs like firing multiple queries at the same time without queuing them up, utilizing any underlying library which uses parallel execution, etc. Of course, we shouldn't fill up our memory with connections so a tuned number of connections in the pool according to the pod's memory is advisable.

    1. It seems that if we open a Client connection at the Start of Day and let it be there till the end of the day, it should work for us. Is that a good approach or a really poor design?

    I can't understand correctly what you're trying to do here but I'd for connection pooling.

    1. Do we get any benefits by using Pools, if we are doing for each message processing from these 9 consumers running.

    Yes. Apart from the benefits already mentioned in point 4, you get better resource utilization of your k8s pods (again that depends on whether if 9 consumers are optimal according to the message incoming rate).