Search code examples
pythonmultithreadinggoogle-app-enginetask-queue

GAE - What is the fastest way to add tasks to queue? Why does this appear to be so slow?


I am using Google App Engine (Python) to process some event messages in real time. In short I have 100+ tasks that I need to run fast when a message comes in. I have tried a few approaches (deferred library, threads) and I think the best solution involves using the task queue and asynchronously adding these tasks to the queue I want. Here's an example of what I am doing.

tasks = []
task = Task(url=url_for('main.endpoints_worker'),params={'id': id})
tasks.append(task.add_async(queue_name='event-message'))

for task in tasks:
    task.get_result()

When I do this most of my time is spent adding these tasks to the queue. Is there a way to speed this up? Is there a better approach?

Example of Task Queue Bulk Add Time

Now to be honest I get vastly different times each time I run this. Sometimes I am around 100ms (which would be fine) but other times I am up around 1s.

I would have thought spreading out the work would have been faster but bulk adding to the task queue out performs. With suggested approach below here is what I am seeing:

tasks = [Task(url=url_for('main.endpoints_worker'),params={'id': id}) for id in id_list]
rpc = Queue('event-message').add_async(tasks)
rpc.get_result()

enter image description here

UPDATE: I need to examine this issue again due to the 100 task limit when adding to a queue. I have greatly improved the throughput of my code by batching the creation of my tasks (groups of 100) but I still don't understand why adding multiple groups of tasks to a queue slows down so quickly. One task queue.add_async runs < 40ms no issue. When I do 2 or more queue.add_async that time slows down. I would love to know why? Also how do I get around this?

enter image description here

When I add batches of tasks without async they each take < 40ms. Why do they take so much longer when async is used?

enter image description here

Another Update I thought the issue may be contention related but even when I add each of these tasks to a different queue I get the same results.


Solution

  • You can save a ton of time by queuing your tasks in batches. Something like the following should work for you:

    tasks = [Task(url=url_for('main.endpoints_worker'),params={'id': id}) for id in id_list]
    rpc = Queue('event-message').add_async(tasks)
    rpc.wait()
    

    Note that you can't submit tasks in batches with the deferred library.