Search code examples
pythonmultiprocessingselectorpython-asyncioepoll

Could not use os.fork() bind several process to one socket server when using asyncio


We all know that using asyncio substantially improves the performance of a socket server, and obviously things get even more awesome if we could take advantage of all cores in our cpu (maybe via multiprocessing module or os.fork() etc.)

I'm now trying to build a multicore socket server demo, with a asynchronous socket server listening on each core and all binding to one port. Simply by creating a async server and then use os.fork(), let processes work competitively.

However the single-core-fine code runs into some trouble when I'm trying to fork. Seems like there's some problem with registering same filedescriptors from different processes in epoll selector module.

I'm showing some code below, can anyone help me out?


Here's a simple, logically clear code of echo server using asyncio:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

It works fine until I'm trying to fork, after the socket was bounl and before server starts serving. (This logic works fine in synchronous -- threading based code.)


When I'm trying this:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

Theoretically forked process are bounl to a same socket? And run in a same event loop? then work just fine?

However I'm getting these error messages:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python version 3.7.3,

I'm totally confused about what's going on.

Could anybody help? thanks


Solution

  • According to the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment on the same issue, multi-processing can be implemented by forking before starting a loop, therefore running fully independent asyncio loops in each child.

    Your code actually confirms this possibility: while create_server is async def, it doesn't await anything, nor does it use the loop argument. So we can implement Yury's approach by by making create_server a regular function, removing the loop argument, and calling it before os.fork(), and only running event loops after forking:

    import os, asyncio, socket, multiprocessing
    
    async def handler(loop, client):
        with client:
            while True:
                data = await loop.sock_recv(client, 64)
                if not data:
                    break
                await loop.sock_sendall(client, data)
    
    # create tcp server
    def create_server():
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(('', 25000))
        sock.listen()
        sock.setblocking(False)
        return sock
    
    # whenever accept a request ,create a handler task in eventloop
    async def serving(loop, sock):
        while True:
            client, addr = await loop.sock_accept(sock)
            loop.create_task(handler(loop, client))
    
    sock = create_server()
    
    for num in range(multiprocessing.cpu_count() - 1):
        pid = os.fork()
        if pid <= 0:            # fork process as the same number as 
            break               # my cpu cores
    
    loop = asyncio.get_event_loop()
    loop.create_task(serving(loop, sock))
    loop.run_forever()