Search code examples
pythongoogle-app-engineapp-engine-ndb

NDB Async API and get_or_insert_async


I'm trying to wrap my head around the async api, without much success.

I a fairly simple setup in my lab project. I have a model that looks like this:

class SearchIndex(model.Model):
    name = model.StringProperty(required=True)
    reference_list = model.KeyProperty(repeated=True)

And a method that uses get_or_insert and the checks if reference_list contains a key, if not add it. Below the entity is a model entity and list is a list of strings ["abc","def","ghi"]

@classmethod
    def store_list_in_index(cls, list, entity):
        put_queue = []

        for verb in list:
            index_entity = cls._SEARCH_INDEX_DB_MODEL.get_or_insert(verb, name=verb)
            if not entity.key in index_entity.reference_list:
                index_entity.reference_list.append(entity.key)
                put_queue.append(index_entity)

        if put_queue:
            ndb.put_multi_async(put_queue)

This worked as I wanted, but to a looong time. If the list was about 20-30 times long. It took about 15-20 sec.

So I started look at the async api. But don't get very far. Now it doesn't store any thing in the db:

@classmethod
def store_list_in_index(cls, list, entity):
    put_queue = []
    async_queue = []

    @tasklets.tasklet
    def txn(verb, entity):
        ent = yield cls._SEARCH_INDEX_DB_MODEL.get_or_insert_async(verb, name=verb)
        if not entity.key in ent.reference_list:
            ent.reference_list.append(entity.key)
            put_queue.append(ent)
        raise tasklets.Return(ent)

    for verb in list:
        en = txn(verb, entity)

    if put_queue:
        ndb.put_multi_async(put_queue)

I don't really understand where, mostly since I don't understand the concept of the tasklets and yield. Anyone have any idea or can point my in a direction?

EDIT:

I ended up with this solution:

@classmethod
@ndb.tasklet
def get_or_insert_index_entity(cls, verb):
    ent = yield cls._SEARCH_INDEX_DB_MODEL.get_by_id_async(verb)
    if not ent:
        key = ndb.Key(cls._SEARCH_INDEX_DB_MODEL, verb)
        ent = cls._SEARCH_INDEX_DB_MODEL(key=key, name=verb)
        yield ent.put_async()

    raise ndb.Return(ent)

@classmethod
@ndb.tasklet
def txn(cls, verb, entity):
    ent = yield cls.get_or_insert_index_entity(verb)
    if not entity.key in ent.reference_list:
        ent.reference_list.append(entity.key)
        yield ent.put_async()
    raise ndb.Return(ent)

@classmethod
def store_list_in_index(cls, list, entity):
    put_queue = []
    for verb in list:
        put_queue.append(cls.txn(verb, entity))

And adding @ndb.toplevel to my get-request handler. And it's way faster!

I also have posted this question on https://groups.google.com/forum/?fromgroups#!topic/appengine-ndb-discuss/L4DEsYdEwTE and included some follow up questions


Solution

  • If you don't wait for the result to come back from your "ndb.put_multi_async(put_queue)", then your web handler might finish before it actually gets around to making the request. Check that put_multi_async function's return value. It's a list of Futures.

    To wait for one Future to finish, you might say fut.get_result() (or fut.wait() if you don't care about the return value). If you have a bunch of futures, you probably want Future.wait_all wait_any described at http://code.google.com/appengine/docs/python/ndb/futureclass.html