python-3.x - 在 Python 3 asyncio 中处理大量并发连接
问题描述
我正在尝试提高我的应用程序的性能。它是一个基于 Python3.6 asyncio.Protocol 的 TCP 服务器(SSL 包装)处理大量请求。
当只有一个连接处于活动状态时,它工作正常并且性能可以接受,但是一旦打开另一个连接,应用程序的客户端部分就会变慢。一旦有 10-15 个客户端连接,这真的很明显。
有没有办法正确处理并行请求,或者我应该求助于运行多个服务器实例?
/edit 添加的代码
主要的.py
if __name__ == '__main__':
import package.server
server = package.server.TCPServer()
server.join()
包.服务器
import multiprocessing, asyncio, uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
from package.connection import Connection
class TCPServer(multiprocessing.Process):
name = 'tcpserver'
def __init__(self, discord_queue=None):
multiprocessing.Process.__init__(self)
self.daemon = True
# some setup in here
self.start()
def run(self):
loop = uvloop.new_event_loop()
self.loop = loop
# db setup, etc
server = loop.create_server(Connection, HOST, PORT, ssl=SSL_CONTEXT)
loop.run_until_complete(server)
loop.run_forever()
包.连接
import asyncio, hashlib, os
from time import sleep, time as timestamp
class Connection(asyncio.Protocol):
connections = {}
def setup(self, peer):
self.peer = peer
self.ip, self.port = self.peer[0], self.peer[1]
self.buffer = []
@property
def connection_id(self):
if not hasattr(self, '_connection_id'):
self._connection_id = hashlib.md5('{}{}{}'.format(self.ip, self.port, timestamp()).encode('utf-8')).hexdigest()
return self._connection_id
def connection_lost(self, exception):
del Connection.connections[self.connection_id]
def connection_made(self, transport):
self.transport = transport
self.setup(transport.get_extra_info('peername'))
Connection.connections[self.connection_id] = self
def data_received(self, data):
# processing, average server side execution time is around 30ms
sleep(0.030)
self.transport.write(os.urandom(64))
该应用程序在 Debian 9.9 上运行,并通过 systemd 启动
为了“基准测试”,我使用这个脚本:
import os, socket
from multiprocessing import Pool
from time import time as timestamp
def foobar(i):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 60000))
while True:
ms = timestamp()*1000
s.send(os.urandom(128))
s.recv(1024*2)
print(i, timestamp()*1000-ms)
if __name__ == '__main__':
instances = 4
with Pool(instances) as p:
print(p.map(foobar, range(0, instances)))
解决方案
在这里回答我自己的问题。我采用了一个解决方案,该解决方案生成了多个正在监听 base_port + x 的实例,并在其前面放置了一个 nginx TCP 负载均衡器。
各个TCPServer
实例仍然作为自己的进程生成,并通过单独的 UDP 连接在它们之间进行通信,并通过multiprocessing.Queue
.
虽然这并不能“解决”问题,但它为我非常具体的问题提供了一种可扩展的解决方案。
推荐阅读
- node.js - 如何构建多个 npm 包进行共享?
- r - 将显着性星添加到相关图,哪个包?
- javascript - 无法在 BigQuery UDF 中使用 DOMParser
- javascript - Javascript 对象和 getAttribute 显示不同的值
- javascript - 如何在点击java脚本的地方画圆
- python - 如何将数据加载到 Conv3d 进行视频分类?
- django - Django 频道使用 WS:// 但不使用 WSS://
- c++ - 具有函数、输入和输出类型的可变参数模板
- apache-nifi - Nifi JOLT: flat JSON object to a list of JSON object
- angular - 以角度迭代对象打字稿