I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.
aq = Queue()
........................
The instance in the queue are of class A:
class A:
id: str
desc: str
In a function I'm getting elements from the queue aq
and process them in chunks.
The first element(if is just one) can be a SENTINEL, nothing to process.
....
def process:
chunk_data = []
all = [
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start process in chunks
# adding elements to the chunk list until is full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()
The function works as expected but I consider the code to be convoluted. I'm looking for a simple, clearer version.
In the interest of following the DRY principle, here's what I believe is a cleaner version of your code with no repetition of logics. Note that it's generally better to handle errors by raising an exception than to simply return when the type of an input value is not as expected.
def process():
all = []
while True:
chunk_data = []
for _ in range(CHUNK_MAX_SIZE):
if (item := aq.get()) == SENTINEL:
break
assert isinstance(item, A)
chunk_data.append(item.id)
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
You can also clean up the code a bit further with iter
and itertools.islice
if you don't need to validate if each item is of type A
(which you shouldn't anyway if you have control over the code that enqueues items):
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
while True:
chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
Credit to @KellyBundy for a more concise version as commented below:
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
ids = map(attrgetter('id'), data)
while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
all += process_data(chunk_ids)