I've built a producer thread that has multiple consumer threads. I've used a queue where the producer puts all the tasks. Then the consumer threads use get to retrieve the tasks and execute them. The problem that I keep running into is the some items in the queue are grabbed by multiple consumer threads.
In my Automator class I create the thread pool, then I put all the tasks into the queue, using the add_task method of TaskThreadPool. The TaskRunners then start pulling all of the tasks out of the queue and running them. After the first cycle completes, each following cycle each consumer thread will grab the last item at the same time. So in this case I'll see multiple of the same task run, when only 1 should be run. I've tried adding locks in the consumer thread just before the _q.get is performed, but I still get the same result. How can I ensure that only 1 thread will grab the tasks and keep another thread from touching it.
https://i.sstatic.net/ndMSF.jpg
class TaskThreadPool:
""" Pool of threads consuming tasks for a queue """
def __init__(self, num_threads):
self.num_threads = num_threads
self._q = Queue()
self.workers = []
def create_threads(self):
for _ in range(self.num_threads):
self.workers.append(TaskRunner(self._q))
with print_lock:
print('{} tasks threads created'.format(len(self.workers)))
def add_task(self, task):
""" Add a tasks to the queue """
self._q.put(task)
def wait_completion(self):
""" Wait for completion of all the tasks in the queue """
self._q.join()
class TaskRunner(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, queue):
super(TaskRunner, self).__init__(daemon=True)
self._q = queue
self.start()
def run(self):
while True:
if not self._q.empty():
task = self._q.get()
try:
task.run_task()
finally:
self._q.task_done()
class Automator:
def __init__(self, test_task_id=None, test_loop_count=0):
"""
Automator Settings
Basic start up settings, when testing a single task no Thread Pool is created.
If Module Testing is enabled on test tasks will be run
"""
# Specific Task testing information
self.test_task_id = test_task_id
self.task_test = False
self.test_loop_count = test_loop_count
self.print_lock = print_lock
print('Starting Automator 3')
...
# Queue information
self.cycle_queue = []
self.priority_queues = {}
self.number_of_priority_queues = 0
# Max number of threads to have running
self.max_task_num_threads = 7
self.threads_created = False
self.task_pool = TaskThreadPool(self.max_task_num_threads)
...
# If a test task id was provided turn task_test on
if self.test_task_id:
self.task_test = True
def open_thread_pool(self):
with self.print_lock:
print('Creating Task Threads')
self.task_pool.create_threads()
self.threads_created = True
...
...
def _run_cycle_queue(self):
print('Running Cycle Tasks')
for cycle_task in self.cycle_queue:
self.task_pool.add_task(cycle_task)
self.task_pool.wait_completion()
def _run_standard_task_queues(self):
"""
Loop through each task in a queue list and add task to queue
"""
print('Running Standard Tasks')
for queue_number in range(self.number_of_priority_queues):
queue = self.priority_queues[str(queue_number)]
if len(queue) > 0:
for task in queue:
self.task_pool.add_task(task)
self.task_pool.wait_completion()
def _sleep(self):
"""
Find when the next 5 minute interval. (10:00, 10:05, 10:10)
Sleep till next 5 minute interval begins
"""
now = dt.datetime.now()
# How long until next run interval
minutes_to_sleep = 5 - now.minute % 5
print('Automator 3 Restarting in {} minutes'.format(minutes_to_sleep))
time.sleep((minutes_to_sleep * 60) - now.second)
now = dt.datetime.now()
print('Automator 3 Restarting {}'.format(now))
def run_automator(self):
# Start Program Loop
cycles = 0
mode_print = False
# Open Database Connection
self.dw.open_connection()
try:
while True:
cycles += 1
print('Cycle {} Started'.format(cycles))
try:
# Get tasks from automator table
self._refresh_task_data()
# Update meta data status
self._status_running()
if not self.task_test:
# Backup Local Files
self.backup_files()
# Create Task Objects
self._create_task_objects()
# Create Task Priorities
self._check_priorities()
if self.task_test:
# Start up requested task
self.test_task(self.test_task_id)
if not self.test_loop_count \
or cycles == self.test_loop_count:
break
else:
if not mode_print:
print('Running Automator 3 - MODE: Standard')
mode_print = True
# Sort Tasks into Lists
self._setup_queues()
if not self.threads_created:
# Create Task Threads
self.open_thread_pool()
# Run Cycle tasks
self._run_cycle_queue()
# Setup Task queues and execute all tasks
self._run_standard_task_queues()
# Update the last run in meta data
self._update_last_run()
# Update meta data status
self._status_sleeping()
print('Cycle {} Completed'.format(cycles))
# Sleep till next 5 minute interval 12:00, 12:05, etc
self._sleep()
except Exception as e:
raise e
finally:
self.dw.close_connection()
The issue was my method that was handing items to the Queue. I added an additional check to stop unneeded items from being added to the queue.