首页 > 解决方案 > 使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器

问题描述

我们都知道使用 asyncio 可以显着提高套接字服务器的性能,显然如果我们可以利用 cpu 中的所有内核(可能通过多处理模块os.fork()等) ,事情会变得更加出色。

我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器在每个内核上侦听并全部绑定到一个端口。只需创建一个异步服务器然后使用os.fork(), 让进程有竞争力地工作。

但是,当我尝试分叉时,单核精细代码会遇到一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。

我在下面显示一些代码,有人可以帮助我吗?


这是使用 asyncio 的 echo 服务器的简单、逻辑清晰的代码:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

在我尝试分叉之前,它工作正常,在套接字被绑定之后和服务器开始服务之前。(这个逻辑在同步——基于线程的代码中运行良好。)


当我尝试这个时:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

理论上分叉的进程绑定到同一个套接字?并在同一个事件循环中运行?那么工作就好了吗?

但是我收到这些错误消息:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python 3.7.3 版,

我完全不知道发生了什么。

有人可以帮忙吗?谢谢

标签: pythonmultiprocessingselectorpython-asyncioepoll

解决方案


根据跟踪器问题,不支持分叉现有的异步事件循环并尝试从多个进程中使用它。但是,根据Yury对同一问题的评论,可以在开始循环之前通过分叉来实现多处理,因此在每个子进程中运行完全独立的异步循环。

您的代码实际上证实了这种可能性: while create_serveris async def,它不等待任何东西,也不使用loop参数。因此,我们可以通过创建create_server一个常规函数、删除loop参数并在 before 调用它os.fork(),并且只在 fork 之后运行事件循环来实现 Yury 的方法:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()

推荐阅读