It is not too clear from the documentation if Search API operations can be part of a Datastore transaction. So can we expect the same ACID properties that we expect from Datastore operations in a transaction. And is a Document similar to an Entity in that regard?
From this video it seems that they are part of the transaction: http://www.youtube.com/watch?v=7B7FyU9wW8Y&list=FLcBSmKKUXoPd5yFneNFDv4A#t=1952
If not how do we preserve consistency in large scale applications?
Indexing your search document is not transactional, but what IS transactional is deferring a task to run later.
You can check if you are in a transaction with ndb.in_transaction()
, and easily defer
it like this:
class UserModel(ndb.Model):
...
def _post_put_hook(self, future):
deferred.defer(UserModel.put_search_document,
self.username,
self.version,
_transactional=ndb.in_transaction())
You also need to handle retries and failures. This excellent article has a complete walkthrough and explanation, including simple versioning to protect against failures, retries and dirty reads.
Here is the complete example code from that article:
import logging
from google.appengine.api import search
from google.appengine.ext import ndb
from google.appengine.ext import deferred
class UserModel(ndb.model):
username = ndb.StringProperty(required=True)
email = ndb.StringProperty(required=True)
version = ndb.IntegerProperty(default=0)
@classmethod
def put_search_document(cls, username, version):
model = ndb.Key(cls, username).get()
if model:
if version < model.version:
logging.warning('Attempting to write stale data. Ignore')
return
if version > model.version:
msg = 'Attempting to write future data. Retry to await consistency.'
logging.warning(msg)
raise Exception(msg)
# Versions match. Update the search document
document = search.Document(
doc_id = username,
fields=[
search.TextField(name='username', value=model.username),
search.TextField(name='email', value=model.email),
search.TextField(name='version', value=model.version),
])
index = search.Index(name="UserIndex")
index.put(document)
def _pre_put_hook(self):
self.version = self.version + 1
def _post_put_hook(self, future):
deferred.defer(UserModel.put_search_document,
self.username,
self.version,
_transactional=ndb.in_transaction())