Search code examples
pythontornadofuturejupyterpyzmq

Why my callback function in the following script never get called?


In following script why my callback function never get called ? I am using a pre-created kernel to run the code and trying to get the output of it with attaching the callback for respective sockets.

from zmq.eventloop import ioloop
ioloop.install()
from zmq.eventloop.zmqstream import ZMQStream
from functools import partial
from tornado import gen
from tornado.concurrent import Future
from jupyter_client import BlockingKernelClient
from pprint import pprint
import logging, os, zmq

reply_futures = {}

context = zmq.Context()
publisher = context.socket(zmq.PUSH)
publisher.connect("tcp://127.0.0.1:5253")

def reply_callback(session, stream, msg_list):
    idents, msg_parts = session.feed_identities(msg_list)
    reply = session.deserialize(msg_parts)
    parent_id = reply['parent_header'].get('msg_id')
    reply_future = reply_futures.get(parent_id)
    print("{} \n".format(reply))
    if reply_future:
        if "execute_reply" == reply["msg_type"]:
            reply_future.set_result(reply)
    publisher.send(reply)

def fv_execute():
    code = 'print ("hello")'
    msg_id = execute(code)
    return msg_id

def get_connection_file(kernel_id):
    json_file = 'kernel-{}.json'.format(kernel_id)
    return os.path.join('/tmp',json_file)

def execute(code,):
    kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
    cf = get_connection_file(kernel_id)
    kernel_client = BlockingKernelClient(connection_file=cf)
    setup_listener(kernel_client)
    msg_id = ioloop.IOLoop.current().run_sync(lambda:         execute_(kernel_client,code))
    return msg_id

def setup_listener(kernel_client):
    shell_stream = ZMQStream(kernel_client.shell_channel.socket)
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
    shell_stream.on_recv_stream(partial(reply_callback,     kernel_client.session))
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))

@gen.coroutine
def execute_(kernel_client, code):
    msg_id = kernel_client.execute(code)
    f = reply_futures[msg_id] = Future()
    print("Is kernel alive: {}".format(kernel_client.is_alive()))
    print(msg_id)
    yield f
    raise gen.Return(msg_id)

if __name__ == '__main__':
    fv_execute()

here is output, the script runs forever

jupyter@albus:~/lab$ python2 iolooptest2.py
Is kernel alive: True
de3eae2e-48d3-451a-b6bc-421674bb2a35
^X^CTraceback (most recent call last):
  File "iolooptest2.py", line 61, in <module>
    fv_execute()
  File "iolooptest2.py", line 30, in fv_execute
    msg_id = execute(code)
  File "iolooptest2.py", line 42, in execute
    msg_id = ioloop.IOLoop.current().run_sync(lambda:     execute_(kernel_client,code))
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py",  line 452, in run_sync
    self.start()
  File "/usr/local/lib/python2.7/dist-     packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
 File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line  862, in start
    event_pairs = self._impl.poll(poll_timeout)
  File "/usr/local/lib/python2.7/dist-   packages/zmq/eventloop/ioloop.py", line 122, in poll
    z_events = self._poller.poll(1000*timeout)
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/poll.py", line 99, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "zmq/backend/cython/_poll.pyx", line 116, in  zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:2036)
  File "zmq/backend/cython/checkrc.pxd", line 12, in  zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_poll.c:2418)
 KeyboardInterrupt

A slightly modified version of the code is here https://gist.github.com/jayendra13/76a4f5726428882013ea62d94974da5c where I pass ioloop as a argument to zmqstream, while attaching the callback, which also has a same behaviour.

Here is almost similar script which works https://gist.github.com/jayendra13/e553fafba5398e287107e947c16988df


Solution

  • Adding the following two lines after the creation of kernel_client solved my issue.

        kernel_client.load_connection_file()
        kernel_client.start_channels()
    

    so new execute looks like this

    def execute(code,):
        kernel_id = '46459cb4-fa34-497a-8e3d-dfb3ab4476fd'
        cf = get_connection_file(kernel_id)
        kernel_client = BlockingKernelClient(connection_file=cf)
        kernel_client.load_connection_file()
        kernel_client.start_channels()
        setup_listener(kernel_client)
        msg_id = ioloop.IOLoop.current().run_sync(lambda:         execute_(kernel_client,code))
        return msg_id