python - 使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器
问题描述
我们都知道使用 asyncio 可以显着提高套接字服务器的性能,显然如果我们可以利用 cpu 中的所有内核(可能通过多处理模块os.fork()
等) ,事情会变得更加出色。
我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器在每个内核上侦听并全部绑定到一个端口。只需创建一个异步服务器然后使用os.fork()
, 让进程有竞争力地工作。
但是,当我尝试分叉时,单核精细代码会遇到一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。
我在下面显示一些代码,有人可以帮助我吗?
这是使用 asyncio 的 echo 服务器的简单、逻辑清晰的代码:
import os
import asyncio #,uvloop
from socket import *
# hendler sends back incoming message directly
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
async def create_server(loop):
sock = socket(AF_INET ,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
sock.bind(('',25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
while True:
client ,addr = await loop.sock_accept(sock)
loop.create_task(handler(loop ,client))
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()
在我尝试分叉之前,它工作正常,在套接字被绑定之后和服务器开始服务之前。(这个逻辑在同步——基于线程的代码中运行良好。)
当我尝试这个时:
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop.create_task(serving(loop, sock))
loop.run_forever()
理论上分叉的进程绑定到同一个套接字?并在同一个事件循环中运行?那么工作就好了吗?
但是我收到这些错误消息:
Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/test/temp1.py", line 23, in serving
client ,addr = await loop.sock_accept(sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
self._sock_accept(fut, False, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
self.add_reader(fd, self._sock_accept, fut, True, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
return self._add_reader(fd, callback, *args)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
(handle, None))
File "/usr/local/lib/python3.7/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists
Python 3.7.3 版,
我完全不知道发生了什么。
有人可以帮忙吗?谢谢
解决方案
根据跟踪器问题,不支持分叉现有的异步事件循环并尝试从多个进程中使用它。但是,根据Yury对同一问题的评论,可以在开始循环之前通过分叉来实现多处理,因此在每个子进程中运行完全独立的异步循环。
您的代码实际上证实了这种可能性: while create_server
is async def
,它不等待任何东西,也不使用loop
参数。因此,我们可以通过创建create_server
一个常规函数、删除loop
参数并在 before 调用它os.fork()
,并且只在 fork 之后运行事件循环来实现 Yury 的方法:
import os, asyncio, socket, multiprocessing
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
def create_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
while True:
client, addr = await loop.sock_accept(sock)
loop.create_task(handler(loop, client))
sock = create_server()
for num in range(multiprocessing.cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
推荐阅读
- java - 如何在每个活动 android 中添加抽屉?
- c - 在C中旋转矩阵
- javascript - JavaScript ES6 Promise 是否与 Promise/A+ 完全相同?
- c# - 执行 EF Core 迁移 (SmartEnum) 时“找不到适合实体类型的构造函数”
- php - 为 PHP 7.3 启用 ZeroMQ 扩展的具体步骤是什么?
- c# - 来自 Azure Active Directory 的 JWT 中的角色声明的 Azure HTTP 触发器函数授权
- excel - 使用 excel vba 将音频文件添加到 PowerPoint 演示文稿
- fortran - 尝试使用 do 循环和打开文件在 Fortran 中制作 gif
- db2 - DB2Audit 工具不能正常工作
- html - HTML / CSS 在此示例中是否有更好/更清洁的水平和垂直居中方式?