Search code examples
tornadoetcd

Tornado and aio_etcd to establish a lock


I'm attempting to get Tornado working with aio_etcd aio_etcd documentation here

I want to acquire a lock within a Tornado Coroutine. I have written the following code. The examples from the documentation use 'await' however i substituted this with yield from since i decorate using the @tornado.gen.coroutine I'm not sure if this is correct or not. I get the following crash with the code below:

    raise RuntimeError('Timeout context manager should be used '
RuntimeError: Timeout context manager should be used inside a task
ERROR:tornado.access:500 GET / (::1) 6.33ms

...

import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
from multiprocessing import Process
import aio_etcd as etcd

def run_process(port):

   app=Application()
   server=tornado.httpserver.HTTPServer(app)
   server.listen(port)
   tornado.ioloop.IOLoop.current().start()

class  MainHandler(tornado.web.RequestHandler):
   @tornado.gen.coroutine
   def get(self):
      print("in function")

      l = etcd.Lock(self.application.etcdClient, "L")

      # Use the lock object:
      yield from l.acquire(blocking=True, lock_ttl=None)
      print("got lock")
      yield tornado.gen.sleep(30)
      yield from l.release()
      print("releasing lock")



class Application(tornado.web.Application):
   def __init__(self):
      handlers = [
         (r"/",MainHandler),
      ]

      self.etcdClient=etcd.Client()

      # Settings dict for Application
      settings = {
      }
      tornado.web.Application.__init__(self,handlers,debug=True,**settings)      

if __name__ =='__main__':
   Process(target=run_process,args=(8000,)).start()
   Process(target=run_process,args=(8001,)).start()

Also in my code i don't use the new async keyboard instead all my Tornado co-routines are decorated with the @tornado.gen.coroutine Can someone please explain also why the yield keyword is used instead of yield from in Tornado co-routine and why these are different. Does anyone know how i can get this code working in Tornado?

Updated Code using asyncio:

import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
import asyncio 
from multiprocessing import Process
import aio_etcd as etcd
import tornado.platform.asyncio 
def run_process(port):
   tornado.platform.asyncio.AsyncIOMainLoop().install()
   app=Application()
   server=tornado.httpserver.HTTPServer(app)
   server.listen(port)
   asyncio.get_event_loop().run_forever()

class  MainHandler(tornado.web.RequestHandler):
   async def get(self):

      print("here")
      lock = etcd.Lock(self.application.etcdClient, "hello")
      # Use the lock object:
      await asyncio.ensure_future(lock.acquire())
      state = await asyncio.ensure_future(lock.is_locked())
      print("lock state")
      print(state)

class Application(tornado.web.Application):
   def __init__(self):
      handlers = [
         (r"/",MainHandler),
      ]

      self.etcdClient=etcd.Client()

      # Settings dict for Application
      settings = {
      }
      tornado.web.Application.__init__(self,handlers,debug=True,**settings)      
if __name__ =='__main__':
   Process(target=run_process,args=(8000,)).start()
   Process(target=run_process,args=(8001,)).start()

Crash Trace:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/web.py", line 1469, in _execute
    result = yield result
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "<string>", line 6, in _wrap_awaitable
  File "check3.py", line 27, in get
    await asyncio.ensure_future(lock.acquire())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/lock.py", line 67, in acquire
    res = await self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 449, in write
    response = await self.api_execute(path, method, params=params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 780, in wrapper
    response = await payload(self, path, method, params=params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 553, in __await__
    resp = yield from self._coro
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 198, in _request
    proxy=proxy, proxy_auth=proxy_auth, timeout=timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client_reqrep.py", line 79, in __init__
    url2 = url.with_query(params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in with_query
    for k, v in query.items())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in <genexpr>
    for k, v in query.items())
  File "yarl/_quoting.pyx", line 46, in yarl._quoting._quote (yarl/_quoting.c:1384)
TypeError: Argument should be str

Solution

  • Many libraries written for asyncio are interoperable with Tornado. Unfortunately some libraries, like aio_etcd, are not, because they implement timeouts in a way that depends on asyncio specifically. See aiohttp issue 877 - this issue describes a problem in aiohttp.Timeout. That code was spun off from aiohttp to the separate async_timeout package, and this separate package is still incompatible with Tornado. This is what aio_etcd uses to implement timeouts, so aio_etcd is incompatible with Tornado, too.

    The long term fix is to open a bug in aio_etcd's tracker asking for Tornado compatibility. aio_etcd could provide Tornado compatibility by not using async_timeout if timeout=None. It's the same suggestion as in this comment. Please link to this StackOverflow question when you open the ticket.

    The short term fix is to use asyncio and its web framework, aiohttp, instead of Tornado:

    from multiprocessing import Process
    
    import asyncio
    from aiohttp import web
    import aio_etcd as etcd
    
    # Don't create this in the parent process, wait for Process() to spawn child.
    etcd_client = None
    
    async def handle(request):
        print("in function")
    
        l = etcd.Lock(etcd_client, "L")
    
        # Use the lock object:
        await l.acquire(blocking=True, lock_ttl=None)
        print("got lock")
        await asyncio.sleep(30)
        await l.release()
        print("releasing lock")
        return web.Response(text='ok')
    
    
    def run_process(port):
        global etcd_client
        app = web.Application(debug=True)
        app.router.add_get('/', handle)
        # Create AFTER multiprocess starts this child process.
        etcd_client = etcd.Client()
        web.run_app(app, port=port)
    
    
    if __name__ == '__main__':
        # run_process(8000)
        Process(target=run_process, args=(8000,)).start()
        Process(target=run_process, args=(8001,)).start()
    

    aiohttp docs to get you started are here.

    To answer your other questions: Tornado has used "yield" for many years in gen.coroutine. Its design influenced asyncio's coroutines, but they used "yield from" in Python 3.4. Guido's explanation of the difference between "yield" and "yield from" is here, and it's certainly interesting, but less relevant now. In Python 3.5 "yield from" was superseded by PEP 492 -- Coroutines with async and await syntax.

    If your code never needs to run on Python older than 3.5, just use "async" and "await" instead of a "coroutine" decorator and "yield" (in Tornado) or "yield from" (in asyncio). For you, I know you're running Python 3.5 because aio_etcd requires it, so just use async and await as I demonstrate in my code above.