Search code examples
google-app-enginegoogle-cloud-platformgoogle-search-apigoogle-app-engine-pythonapp-engine-flexible

How do I query GCP Search APIs asynchronously from an App Engine instance?


I need to search my App Engine indexed documents using the Search API. From what I can tell, the Search API can only be referenced with the google.appengine APIs for the standard environment.

My problem is that some hydrating, flushing, and querying takes longer than 60 seconds. I need to return a response from app engine, continue processing the request in the background, then publish the results to Pub/Sub. However, I can spawn threads or use background_thread in the standard environment. I'd switch to the flexible environment, but I cannot use the Python Search API library.

Is my only option to switch to the flex environment, and use REST APIs?


Solution

  • You probably want to use app engine task queue, which is a task scheduler for queuing pending tasks for another app engine to do it as App engine is single thread engine.

    For example,

    1. Set up a new service to handle task (OPTIONAL)

    Setup a yaml call newtaskworker.yaml which is similar to your app.yaml, as you may want another service to do the task not the original one.

    Only difference is remember to add a service name for it, service: newtaskworker

    Remember to deploy it by gcloud app deploy newtaskworker.yaml

    2. Set up a queue

    Read How to Create new queue, generally speak you need a queue.yaml for queuing the tasks. Remember to deploy it by gcloud app deploy queue.yaml

    queue:
    - name: queue_name
      rate: 20/s #You may limit the speed of *START* new task here
      bucket_size: 40
      max_concurrent_requests: 10 #This is limited by your max_instances allowed in newtaskworker.yaml, you may simply omit it
    

    3. Finally your code

    from google.appengine.api import taskqueue
    
    #/deleteTask
    class DeleteTask(webapp2.RequestHandler):
        def get(self):
            paramA      = self.request.get('paramA')
            paramB      = self.request.get('paramB')
            #your Search delete here
    
    class GetPostDataTask(webapp2.RequestHandler):
        def get(self):   
            #If you don't want to use a new service, simply use 'default` in target.
            #Your Go to Pub/Sub work here.
            taskqueue.add(queue_name='queue_name', url='/deleteTask', method='GET', params={'paramA': 1, 'paramB': 2}, target='newtaskworker')
    

    If nothing goes wrong, you can find your task in Console -> Tools -> Cloud Task