Search code examples
pythongoogle-app-engineapp-engine-ndbtask-queue

Google appengine: Task queue performance


I currently have an application running on appengine and I am executing a few jobs using the deferred library, some of these tasks run daily, while some of them are executed once a month. Most of these tasks query Datastore to retrieve documents and then store the entities in an index (Search API). Some of these tables are replaced monthly and I have to run these tasks on all entities (4~5M).

One exemple of such a task is:

def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
    #get index
    BATCH_SIZE = 200
    cps, next_cursor, more = Company.query().\
                                     fetch_page(BATCH_SIZE,
                                                start_cursor=cursor)

    doc_list = []

    for i in range(0, len(cps)):
        cp = cps[i]
        #create a Index Document using the Datastore entity
        #this document has only about 5 text fields and one date field
        cp_doc = getCompanyDocument(cp)
        doc_list.append(cp_doc)

    index = search.Index(name='Company')
    index.put(doc_list)

    n_entities += len(doc_list)

    if more:
        logging.debug('Company: %d added to index', n_entities)
        #to_put[:] = []
        doc_list[:] = []
        deferred.defer(addCompaniesToIndex,
                       cursor=next_cursor,
                       n_entities=n_entities,
                       mindate=mindate)
    else:
        logging.debug('Finished Company index creation (%d processed)', n_entities)

When I run one task only, the execution takes around 4-5s per deferred task, so indexing my 5M entities would take about 35 hours.

Another thing is that when I run an update on another index (eg, one of the daily updates) using a different deferred task on the same queue, both are executed a lot slower. And start taking about 10-15 seconds per deferred call which is just unbearable.

My question is: is there a way to do this faster and scale the push queue to more than one job running each time? Or should I use a different approach for this problem?

Thanks in advance,


Solution

  • I think I finally managed to get around this issue by using two queues and idea proposed by the previous answer.

    • On the first queue we only query the main entities (with keys_only). And launch another task on a second queue for those keys. The first task will then relaunch itself on queue 1 with the next_cursor.
    • The second queue gets the entity keys and does all the queries and inserts on Full text search/BigQuery/PubSub. (this is slow ~ 15s per group of 100 keys)

    I tried using only one queue as well but the processing throughput was not as good. I believe that this might come from the fact that we have slow and fast tasks running on the same queue and the scheduler might not work as well in this case.