redis list, a producer keeps lpush. In another thread, consumers periodically take all out from the list, and categorize elements. Because the producer keeps pushing, so the taking-all-out must be done atomically. So is there an effective way to do this? spring-data-redis could be used.
// producer
getOpsForList.push(k, v);
// consumer
alist = range(k,0,-1); // take all out
alist.parallelStream() // during which a producer thread could push but I hope it is "blocked".
delete(k); // list is now empty and push from producer is unblocked.
multi
and exec
does not achieve my goal, because it actually submits lrange
, lpush
and delete
just in one transaction. So far, the only way I could think of, is keeping lpop
and add returned into alist
until list is empty.
EDIT, this is what I think:
when you want to be sure an operation is ran only once, using watch
:
watch key
val = get key
val = val + 1
multi
set key val
exec
when you want to be not "interrupted" (not multithreading interrupt), and don't care how many times it runs, transaction (multi
and exec
) is enough.
multi
val = lrange key 0 -1
delete key
exec
val
is still a list after it finishes, like what is said in official-doc
All the commands in a transaction are serialized and executed sequentially. It can never happen that a request issued by another client is served in the middle of the execution of a Redis transaction.
Beyond redis, I took the data operation list.stream.parallelism
out, and the function now only focuses on the data getter, which is exactly like the last code paragraph. ;)
A good example to illustrate how WATCH can be used to create new atomic operations otherwise not supported by Redis is to implement ZPOP, that is a command that pops the element with the lower score from a sorted set in an atomic way.
There is a implementation for ZPOP
in documentation as below:
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC
What you need to do is repeat the operation above If EXEC fails (i.e. returns a Null reply). the producer operation lpush
is atomic, so it needn't to use watch command. for example:
// consumer pesudo code
do {
watch(k);
transaction = multi();
alist = transaction.range(k,0,-1);
transaction.delete(k);
status = get status of transaction.exec();
} while(status == null);
alist.parallelStream()