Search code examples
python-3.xmultithreadingqueuethreadpoolproducer-consumer

Multiple consumer threads grabbing the same item from a queue


I've built a producer thread that has multiple consumer threads. I've used a queue where the producer puts all the tasks. Then the consumer threads use get to retrieve the tasks and execute them. The problem that I keep running into is the some items in the queue are grabbed by multiple consumer threads.

In my Automator class I create the thread pool, then I put all the tasks into the queue, using the add_task method of TaskThreadPool. The TaskRunners then start pulling all of the tasks out of the queue and running them. After the first cycle completes, each following cycle each consumer thread will grab the last item at the same time. So in this case I'll see multiple of the same task run, when only 1 should be run. I've tried adding locks in the consumer thread just before the _q.get is performed, but I still get the same result. How can I ensure that only 1 thread will grab the tasks and keep another thread from touching it.

Console Screenshot https://i.sstatic.net/ndMSF.jpg

class TaskThreadPool:
    """ Pool of threads consuming tasks for a queue """

    def __init__(self, num_threads):
        self.num_threads = num_threads
        self._q = Queue()
        self.workers = []

    def create_threads(self):
        for _ in range(self.num_threads):
            self.workers.append(TaskRunner(self._q))
        with print_lock:
            print('{} tasks threads created'.format(len(self.workers)))

    def add_task(self, task):
        """ Add a tasks to the queue """
        self._q.put(task)

    def wait_completion(self):
        """ Wait for completion of all the tasks in the queue """
        self._q.join()


class TaskRunner(Thread):
    """Thread executing tasks from a given tasks queue"""

    def __init__(self, queue):
        super(TaskRunner, self).__init__(daemon=True)
        self._q = queue
        self.start()

    def run(self):
        while True:
            if not self._q.empty():
                task = self._q.get()
                try:
                    task.run_task()
                finally:
                    self._q.task_done()


class Automator:
    def __init__(self, test_task_id=None, test_loop_count=0):
        """
        Automator Settings
        Basic start up settings, when testing a single task no Thread Pool is created.
        If Module Testing is enabled on test tasks will be run
        """
        # Specific Task testing information
        self.test_task_id = test_task_id
        self.task_test = False
        self.test_loop_count = test_loop_count
        self.print_lock = print_lock
        print('Starting Automator 3')

        ...


        # Queue information
        self.cycle_queue = []
        self.priority_queues = {}
        self.number_of_priority_queues = 0
        # Max number of threads to have running
        self.max_task_num_threads = 7
        self.threads_created = False
        self.task_pool = TaskThreadPool(self.max_task_num_threads)

        ...

        # If a test task id was provided turn task_test on
        if self.test_task_id:
            self.task_test = True

    def open_thread_pool(self):
        with self.print_lock:
            print('Creating Task Threads')
        self.task_pool.create_threads()
        self.threads_created = True

    ...

    ...

    def _run_cycle_queue(self):
        print('Running Cycle Tasks')
        for cycle_task in self.cycle_queue:
            self.task_pool.add_task(cycle_task)
        self.task_pool.wait_completion()

    def _run_standard_task_queues(self):
        """
        Loop through each task in a queue list and add task to queue
        """
        print('Running Standard Tasks')
        for queue_number in range(self.number_of_priority_queues):
            queue = self.priority_queues[str(queue_number)]

            if len(queue) > 0:
                for task in queue:
                    self.task_pool.add_task(task)
                self.task_pool.wait_completion()

    def _sleep(self):
        """
        Find when the next 5 minute interval. (10:00, 10:05, 10:10)
        Sleep till next 5 minute interval begins
        """
        now = dt.datetime.now()
        # How long until next run interval
        minutes_to_sleep = 5 - now.minute % 5
        print('Automator 3 Restarting in {} minutes'.format(minutes_to_sleep))
        time.sleep((minutes_to_sleep * 60) - now.second)
        now = dt.datetime.now()
        print('Automator 3 Restarting {}'.format(now))

    def run_automator(self):
        # Start Program Loop
        cycles = 0
        mode_print = False

        # Open Database Connection        
        self.dw.open_connection()
        try:
            while True:
                cycles += 1
                print('Cycle {} Started'.format(cycles))
                try:
                    # Get tasks from automator table
                    self._refresh_task_data()

                    # Update meta data status
                    self._status_running()

                    if not self.task_test:
                        # Backup Local Files
                        self.backup_files()

                    # Create Task Objects
                    self._create_task_objects()

                    # Create Task Priorities
                    self._check_priorities()

                    if self.task_test:

                        # Start up requested task
                        self.test_task(self.test_task_id)
                        if not self.test_loop_count \
                                or cycles == self.test_loop_count:
                            break

                    else:
                        if not mode_print:
                            print('Running Automator 3 - MODE: Standard')
                            mode_print = True
                        # Sort Tasks into Lists
                        self._setup_queues()

                        if not self.threads_created:
                            # Create Task Threads
                            self.open_thread_pool()

                        # Run Cycle tasks
                        self._run_cycle_queue()

                        # Setup Task queues and execute all tasks
                        self._run_standard_task_queues()

                        # Update the last run in meta data
                        self._update_last_run()

                        # Update meta data status
                        self._status_sleeping()

                        print('Cycle {} Completed'.format(cycles))

                        # Sleep till next 5 minute interval 12:00, 12:05, etc    
                        self._sleep()

                except Exception as e:
                    raise e

        finally:
            self.dw.close_connection()

Solution

  • The issue was my method that was handing items to the Queue. I added an additional check to stop unneeded items from being added to the queue.