Search code examples
pythonasynchronoustornadojupytercoroutine

How to use tornado coroutines in this case?


I have created tornado server which accepts python and matlab code and executes it. Here is server code.

from zmq.eventloop.zmqstream import ZMQStream
from zmq.eventloop import ioloop
ioloop.install()

from functools import partial
from tornado import web, gen, escape
from tornado import options, httpserver
from tornado.concurrent import Future

settings = dict()
settings['autoreload'] = True 
settings['debug'] = True

from jupyter_client import MultiKernelManager

reply_futures = {}
kids = []

class MainHandler(web.RequestHandler):
    def get(self):
        self.write("Hello")

class ExecuteHandler(web.RequestHandler):
    def get(self):
        self.write("approaching execute")

    def post(self):
        data = escape.json_decode(self.request.body)
        print data
        self.application.execute_code(data)

class Application(web.Application):

    def __init__(self):

        handlers = []
        handlers.append((r"/", MainHandler))
        handlers.append((r"/execute", ExecuteHandler))
        web.Application.__init__(self, handlers, **settings)
        self.km = MultiKernelManager()
        self.setup_kernels()

    def setup_kernels(self):

        matlab_kernel_id = self.km.start_kernel(kernel_name="matlab")
        python_kernel_id = self.km.start_kernel(kernel_name="python")

        self.matlab_kernel_id = matlab_kernel_id
        self.python_kernel_id = python_kernel_id

        matkab_kernel_client = self.km.get_kernel(matlab_kernel_id).client()
        matkab_kernel_client.start_channels()

        python_kernel_client = self.km.get_kernel(python_kernel_id).client()
        python_kernel_client.start_channels()

        self.matkab_kernel_client = matkab_kernel_client
        self.python_kernel_client = python_kernel_client

        matlab_iopub_stream = ZMQStream(matkab_kernel_client.iopub_channel.socket)
        matlab_shell_stream = ZMQStream(matkab_kernel_client.shell_channel.socket)

        python_iopub_stream = ZMQStream(python_kernel_client.iopub_channel.socket)
        python_shell_stream = ZMQStream(python_kernel_client.shell_channel.socket)

        matlab_iopub_stream.on_recv_stream(partial(self.reply_callback, matkab_kernel_client.session))
        matlab_shell_stream.on_recv_stream(partial(self.reply_callback, matkab_kernel_client.session))

        python_iopub_stream.on_recv_stream(partial(self.reply_callback, python_kernel_client.session))
        python_shell_stream.on_recv_stream(partial(self.reply_callback, python_kernel_client.session))

    def reply_callback(self, session, stream, msg_list):
        idents, msg_parts = session.feed_identities(msg_list)
        reply = session.deserialize(msg_parts)

        if "stream" == reply["msg_type"]:
            print reply["content"]["text"]
        parent_id = reply['parent_header'].get('msg_id')
        reply_future = reply_futures.get(parent_id)
        if reply_future:
            reply_future.set_result(reply)

    def execute_code(self, data):

        matlab_code = data['matlab_code']
        python_code = data['python_code']

        self.execute_matlab_then_python(matlab_code, python_code)

    @gen.coroutine
    def execute_matlab_then_python(self, matlab_code, python_code):

        print "executing matlab code"
        parent_id1 = self.matkab_kernel_client.execute(matlab_code)
        f1 = reply_futures[parent_id1] = Future()
        yield f1    

        print "executing python code"
        parent_id2 = self.python_kernel_client.execute(python_code)
        f2 = reply_futures[parent_id2] = Future()
        yield f2

    def shutdown_kernels(self):
        self.km.get_kernel(self.matlab_kernel_id).cleanup_connection_file()
        self.km.shutdown_kernel(self.matlab_kernel_id)

        self.km.get_kernel(self.python_kernel_id).cleanup_connection_file()
        self.km.shutdown_kernel(self.python_kernel_id)

if __name__ == '__main__':

    options.parse_command_line()
    app = Application()
    server = httpserver.HTTPServer(app)
    server.listen(8888)

    try:
        ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        print 'going down'
    finally:    
        app.shutdown_kernels()

The client code I use to access is, here

import json
import requests

matlab_code = "a=magic(3);disp(a)"
python_code = "print 'Hello World!!'"

data = {}
data['matlab_code'] = matlab_code
data['python_code'] = python_code

r = requests.post('http://0.0.0.0:8888/execute', json.dumps(data))

My concern is to maintain the order of execution, such that python code get executed only after matlab completes. I am using jupyter_client to execute the matlab/python code. I am using python27 here. The problem is that when I submit the code it throws TypeError: 'NoneType' object is not iterable. Here is a stack-trace out it.

[E 160809 15:17:51 ioloop:633] Exception in callback None
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 887, in start
        handler_func(fd_obj, events)
      File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
        self._handle_recv()
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
        self._run_callback(callback, msg)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
        callback(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 191, in 
        self.on_recv(lambda msg: callback(self, msg), copy=copy)
      File "tornado_test.py", line 86, in reply_callback
        reply_future.set_result(reply)
      File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 276, in set_result
        self._set_done()
      File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 320, in _set_done
        for cb in self._callbacks:
    TypeError: 'NoneType' object is not iterable

I don't understand what is problem here?


Solution

  • The unfortunately cryptic error message "'NoneType' object is not iterable" means that you are calling set_result more than once on the same Future object. I don't know much about zmq or the jupyter kernel interfaces, but my guess is that either the IDs returned by the execute methods of the two different kernels are overlapping, or you're getting more than one response per execute call (from the two different streams?).