Search code examples
pythonapschedulerrpyc

Issue properly passing *args and **kwargs through RPyC to underlying modules


Background

I'm attempting to implement the RPyC proof-of-concept example provided in the APScheduler github repository in order to deploy my application using gunicorn with more than one worker (an issue pointed out in APScheduler's FAQ section). Moreover, I'm trying to do so using Flask-APScheduler in order to be able to easily work within app_contexts from the task functions.

Problem

I can't seem to get past correctly providing the parameters when making a call to the scheduler service that's exposed via RPyC. More specifically, it seems to become an issue when passing parameters args and kwargs (literal variables), along with variables that get stored in the *args and **kwargs of the exposed RPyC function.

Basically, the parameters that I would normally use when directly calling scheduler.add_job() do not work when routed through RPyC and instead cause errors like the one below when attempting to pass the parameters received by the RPyC exposed method to the underlying scheduler instance. How can I fix this?

Minimal, Working Example

Run python app.py in one terminal and python scheduler.py in another terminal

# app.py
from flask import Flask, current_app, jsonify
from flask_apscheduler import APScheduler
import rpyc

scheduler = APScheduler()

def create_app():
    app = Flask(__name__)
    app.scheduler = rpyc.connect("localhost", 12345)
    app.scheduler = app.scheduler.root  # just so current_app.scheduler can be used like normal

    @app.route("/add_job/<report_id>", methods=["GET"])
    def add_job(report_id):
        """
        This works as expected when using 
        from app import scheduler
        scheduler.add_job(...)
        """
        current_app.scheduler.add_job(
            func="app.tasks:run_report",
            args=(report_id,),
            kwargs={"email_results": True},
            executor="threadpool",
            trigger="cron",
            day="*/1",
            id="reconcile_accounts"
        )
        return jsonify({"status": "scheduled"})

    return app

if __name__ == "__main__":
    app = create_app()
    app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc

from app import create_app, scheduler

class SchedulerService(rpyc.Service):
    def __init__(self):
        self._app = None
        self._scheduler = None

    def on_connect(self, conn):
        # code that runs when a connection is created
        # (to init the service, if needed)
        self._app = create_app()
        self._scheduler = scheduler

    def exposed_add_job(self, func, *args, **kwargs):
        # Problem occurs below when sending *args and **kwargs to Flask-APScheduler, which sends them to APScheduler
        job_id = kwargs.pop("id", None)
        return self._scheduler.add_job(job_id, func, *args, **kwargs)

if __name__ == "__main__":
    server = ThreadedServer(SchedulerService, port=12345, protocol_config={"allow_public_attrs": True})
    try:
        server.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        scheduler.shutdown()

Tracebook from self._scheduler.add_job(job_id, func, *args, **kwargs)

127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2088, in __call__
    return self.wsgi_app(environ, start_response)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2073, in wsgi_app
    response = self.handle_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 2070, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1515, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1513, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py", line 1499, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "C:\Users\mhill\PycharmProjects\reporting\app\views.py", line 189, in run_report
    kwargs={"config": json.dumps(report.serialize())}
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 240, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py", line 63, in syncreq
    return conn.sync_request(handler, proxy, *args)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 473, in sync_request
    return self.async_request(handler, *args, timeout=timeout).value
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\async_.py", line 102, in value
    raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required

========= Remote Traceback (1) =========
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 324, in _dispatch_request
    res = self._HANDLERS[handler](self, *args)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py", line 592, in _handle_call
    return obj(*args, **dict(kwargs))
  File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py", line 20, in exposed_add_job
    return self._scheduler.add_job(func, *args, **kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask_apscheduler\scheduler.py", line 168, in add_job
    return self._scheduler.add_job(**job_def)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\apscheduler\schedulers\base.py", line 429, in add_job
    'kwargs': dict(kwargs) if kwargs is not None else {},
ValueError: dictionary update sequence element #0 has length 6; 2 is required

Solution

  • According to this rpyc issue on github, the problem of mapping a dict can be solved by enabling allow_public_attrs on both the server and the client side. Since by default, rpyc won't expose dict methods to support iteration, **kwargs can't work basically because kwargs does not have accessible dict methods.

    In your case, you only have to change the client instance like this:

    app.scheduler = rpyc.connect("localhost", 12345, config={ 'allow_public_attrs': True })