We are trying to write a consumer that would subscribe to a particular topic and do some processing when a couple conditions in the data are met. One operation in the processing however can be done only once, and for the purpose of simplicity let us consider that to be a POST http request that is not idempotent.
Following are some other considerations:-
We were thinking that to make this Consumer Idempotent, we can perhaps do something like:-
For Every Message
Check if message was processed
try:
If !processed
Do Processing (http POST included here)
catch for errors:
Do error processing
finally:
Mark message as processed
"Mark message as processed" would basically be capturing some details to a Relation DB (like Postgres), stuff like offset, timestamp, and a few other details to ensure we have the keys captured that would allow us to identify a record uniquely
Added to the above, a few more questions around the Best Practices with DB processing in the above scenario:-
Say I have 3 k8s nodes and each one of them have 3 consumer pods running, essentially giving 9 single threaded kafka consumers. Is that a correct understanding?
Now since each of these threads will do DB Inserts/Reads, what would be a better to use between a Pool and Client (assuming node-postgres library)?
It seems that if we open a Client connection at the Start of Day and let it be there till the end of the day, it should work for us. Is that a good approach or a really poor design?
Do we get any benefits by using Pools, if we are doing for each message processing from these 9 consumers running.
Additional Assumptions:-
Thanks for being patient and reading this through. Apologize for the length of this post.
Yes, from the POV of Idempotency, your code looks good. Since you're working with Kafka consumers, you don't need an exclusive for loop for message processing. Consumers are invoked on each message arrival. Your psuedo-code should look like this:
Check if message was processed
try:
If !processed
Do Processing (http POST included here)
catch for errors:
Do error processing
finally:
Mark message as processed
Your code misses out on an important aspect which is concurrent duplicate messages. Say, for example, two messages are somehow produced at the same time from the producer (which is actually an error at producer's end) and the message should be processed only once. The consumer starts processing for both messages. At this point
If !processed
, both consumers see the same state which isnot processed
and both of them proceed toDo Processing
. You can avoid such scenarios by acquiring a lock on some an id by which you can tell if a message is duplicate or not. Since you're already using Postgres, you could look intopg_advisory_locks
. So now, your pseudo-code will now look something like:
Check if message was processed
try:
acquire_lock(message.uniqId) //pg_advisory_lock
If !processed
Do Processing (http POST included here)
catch for errors:
if error is lock_already_acquired_by_other_process
return as duplicate processor
else
Do error processing
finally:
Mark message as processed
release lock
We can still do some improvements. The above code doesn't handle failure scenarios where we would like to have retries. There are various ways to achieve this. Oh wait, you're using Kafka. Why not publish the message which has failed in processing (obviously not those which were duplicates) in the same Kafka topic after some delay and have some counter in the message object to check how many times has this message been processed. We would certainly want to retry only a limited number of times, so each time we're processing the message we can check the counter we set previously in the message object to control the number of retries. So far so good, but what about the messages which are failing even after the fixed number of retries. For such cases, you'd like to have a DLQ (dead lettered queue) which holds such messages along with some error message until you have looked at them manually and fixed the errors.
This sounds a lot of code to write. We've another great news. There are libraries available which you can leverage to achieve all these. One such library is bull.
Yeah. As far as I understand this.
Using a pool would be advisable since you'd also aim to achieve faster processing. With connection pooling you can do stuffs like firing multiple queries at the same time without queuing them up, utilizing any underlying library which uses parallel execution, etc. Of course, we shouldn't fill up our memory with connections so a tuned number of connections in the pool according to the pod's memory is advisable.
I can't understand correctly what you're trying to do here but I'd for connection pooling.
Yes. Apart from the benefits already mentioned in point 4, you get better resource utilization of your k8s pods (again that depends on whether if 9 consumers are optimal according to the message incoming rate).