I'm looking for the best way to preform ETL using Python.
I'm having a channel in RabbitMQ which send events (can be even every second). I want to process every 1000 of them. The main problem is that RabbitMQ interface (I'm using pika) raise callback upon every message. I looked at Celery framework, however the batch feature was depreciated in version 3.
What is the best way to do it? I thinking about saving my events in a list, and when it reaches 1000 to copy it to other list and preform my processing. However, how do I make it thread-safe? I don't want to lose events, and I'm afraid of losing events while synchronising the list.
It sounds like a very simple use-case, however I didn't find any good best practice for it.
How do I make it thread-safe?
How about set consumer prefetch-count=1000
. If a consumer's unack
messages reach its prefetch limit, rabbitmq will not deliver any message to it.
Don't ACK
received message, until you have 1000 messages, then copy it to other list and preform your processing. When your job done, ACK
the last message, and all message before this message will be ACK
by rabbitmq server.
But I am not sure whether large prefetch is the best practice.