Search code examples
pythonpython-3.xqueuepython-3.8

Process elements in chunks using multiprocessing queues


I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

aq =  Queue() 

........................

The instance in the queue are of class A:

class A:
 id: str
 desc: str

In a function I'm getting elements from the queue aq and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process. ....

def process:
  chunk_data = []
  all = [
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
   # start process in chunks
   # adding elements to the chunk list until is full
    while len(chunk_data) < CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()  

The function works as expected but I consider the code to be convoluted. I'm looking for a simple, clearer version.


Solution

  • In the interest of following the DRY principle, here's what I believe is a cleaner version of your code with no repetition of logics. Note that it's generally better to handle errors by raising an exception than to simply return when the type of an input value is not as expected.

    def process():
        all = []
        while True:
            chunk_data = []
            for _ in range(CHUNK_MAX_SIZE):
                if (item := aq.get()) == SENTINEL:
                    break
                assert isinstance(item, A)
                chunk_data.append(item.id)
            if chunk_data:
                all.extend(process_data(chunk_data))
            if len(chunk_data) < CHUNK_MAX_SIZE:
                break
    

    You can also clean up the code a bit further with iter and itertools.islice if you don't need to validate if each item is of type A (which you shouldn't anyway if you have control over the code that enqueues items):

    from itertools import islice
    from operator import attrgetter
    
    def process():
        all = []
        data = iter(aq.get, SENTINEL)
        while True:
            chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
            if chunk_data:
                all.extend(process_data(chunk_data))
            if len(chunk_data) < CHUNK_MAX_SIZE:
                break
    

    Credit to @KellyBundy for a more concise version as commented below:

    from itertools import islice
    from operator import attrgetter
    
    def process():
        all = []
        data = iter(aq.get, SENTINEL)
        ids = map(attrgetter('id'), data)
        while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
            all += process_data(chunk_ids)