I have a kind of producer-consumer setup, in which the producers (multiple producers on different threads) enqueue data into a redis queue, and the consumer (single consumer on a single thread) monitors this queue. When the queue length reaches, say >= 10000 items, the consumer should extract the first 10000 items from the queue, delete them from the queue and do some kind of computations on it.
Using redis-py client, I'm using the following code to extract the first 10000 items, and remove them:
logs = REDIS_CLIENT.lrange(task_queue, 0, 9999)
REDIS_CLIENT.ltrim(task_queue, start=10000, end=REDIS_CLIENT.llen(task_queue))
(Documentation for lrange and ltrim)
My question is, is there any opportunity for data loss here? For example, is it possible for tasks to be enqueued in the time between when the function ltrim() is called, and when the queue is actually trimmed (in which case the newest logs would be lost, since the value of end would be the older length)? Or is a lock placed on the queue till the ltrim operation completes?
YES, you might lose data, since there's a time window between ltrim
and llen
.
In order to close that time window, you can set -1
as the end offset:
REDIS_CLIENT.ltrim(task_queue, start=10000, end=-1)
-1
means the end of the list, and you don't need to specify the end offset explicitly.