Search code examples
python-3.xceleryceleryd

Creating detached processes from celery worker/alternative solution?


I'm developing a web service that will be used as a "database as a service" provider. The goal is to have a flask based small web service, running on some host and "worker" processes running on different hosts owned by different teams. Whenever a team member comes and requests a new database I should create one on their host. Now the problem... The service I start must be running. The worker however might be restarted. Could happen 5 minutes could happen 5 days. A simple Popen won't do the trick because it'd create a child process and if the worker stops later on the Popen process is destroyed (I tried this).

I have an implementation that's using multiprocessing which works like a champ, sadly I cannot use this with celery. so out of luck there. I tried to get away from the multiprocessing library with double forking and named pipes. The most minimal sample I could produce:

def launcher2(working_directory, cmd, *args):
    command = [cmd]
    command.extend(list(args))

    process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    with open(f'{working_directory}/ipc.fifo', 'wb') as wpid:
        wpid.write(process.pid)


@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
    working_directory = '/var/tmp/workdir'
    if not os.path.exists(working_directory):
        os.makedirs(working_directory, mode=0o700)

    ipc = f'{working_directory}/ipc.fifo'
    if os.path.exists(ipc):
        os.remove(ipc)
    os.mkfifo(ipc)
    pid1 = os.fork()
    if pid1 == 0:
        os.setsid()
        os.umask(0)

        pid2 = os.fork()
        if pid2 > 0:
            sys.exit(0)

        os.setsid()
        os.umask(0)

        launcher2(working_directory, cmd, *args)
    else:
        with os.fdopen(os.open(ipc, flags=os.O_NONBLOCK | os.O_RDONLY), 'rb') as ripc:
            readers, _, _ = select.select([ripc], [], [], 15)
            if not readers:
                raise TimeoutError(60, 'Timed out', ipc)
            reader = readers.pop()
            pid = struct.unpack('I', reader.read())[0]
        pid, status = os.waitpid(pid, 0)
        print(status)


if __name__ == '__main__':
    async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
    print(async_result.get())

My usecase is more complex but I don't think anyone would want to read 200+ lines of bootstrapping, but this fails exactly on the same way. On the other hand I don't wait for the pid unless that's required so it's like start the process on request and let it do it's job. Bootstrapping a database takes roughly a minute with the full setup, and I don't want the clients standing by for a minute. Request comes in, I spawn the process and send back an id for the database instance, and the client can query the status based on the received instance id. However with the above forking solution I get:

[2020-01-20 18:03:17,760: INFO/MainProcess] Received task: Test[dbebc31c-7929-4b75-ae28-62d3f9810fd9]  
[2020-01-20 18:03:20,859: ERROR/MainProcess] Process 'ForkPoolWorker-2' pid:16634 exited with 'signal 15 (SIGTERM)'
[2020-01-20 18:03:20,877: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).')
Traceback (most recent call last):
  File "/home/pupsz/PycharmProjects/provider/venv37/lib/python3.7/site-packages/billiard/pool.py", line 1267, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).

Which leaves me wondering, what might be going on. I tried an even more simple task:

@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
    working_directory = '/var/tmp/workdir'
    if not os.path.exists(working_directory):
        os.makedirs(working_directory, mode=0o700)

    command = [cmd]
    command.extend(list(args))

    process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return process.wait()


if __name__ == '__main__':
    async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
    print(async_result.get())

Which again fails with the very same error. Now I like Celery but from this it feels like it's not suited for my needs. Did I mess something up? Can it be achieved, what I need to do from a worker? Do I have any alternatives, or should I just write my own task queue?


Solution

  • Celery is not multiprocessing-friendly, so try to use billiard instead of multiprocessing (from billiard import Process etc...) I hope one day Celery guys do a heavy refactoring of that code, remove billiard, and start using multiprocessing instead...

    So, until they move to multiprocessing we are stuck with billiard. My advice is to remove any usage of multiprocessing in your Celery tasks, and start using billiard.context.Process and similar, depending on your use-case.