首页 > 解决方案 > 在 Python 3 asyncio 中处理大量并发连接

问题描述

我正在尝试提高我的应用程序的性能。它是一个基于 Python3.6 asyncio.Protocol 的 TCP 服务器(SSL 包装)处理大量请求。

当只有一个连接处于活动状态时,它工作正常并且性能可以接受,但是一旦打开另一个连接,应用程序的客户端部分就会变慢。一旦有 10-15 个客户端连接,这真的很明显。

有没有办法正确处理并行请求,或者我应该求助于运行多个服务器实例?

/edit 添加的代码

主要的.py

if __name__ == '__main__':
    import package.server
    server = package.server.TCPServer()
    server.join()

包.服务器

import multiprocessing, asyncio, uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

from package.connection import Connection

class TCPServer(multiprocessing.Process):
    name = 'tcpserver'
    def __init__(self, discord_queue=None):
        multiprocessing.Process.__init__(self)
        self.daemon = True

        # some setup in here

        self.start()

    def run(self):
        loop = uvloop.new_event_loop()
        self.loop = loop

        # db setup, etc

        server = loop.create_server(Connection, HOST, PORT, ssl=SSL_CONTEXT)
        loop.run_until_complete(server)
        loop.run_forever()

包.连接

import asyncio, hashlib, os

from time import sleep, time as timestamp

class Connection(asyncio.Protocol):
    connections = {}

    def setup(self, peer):
        self.peer = peer

        self.ip, self.port = self.peer[0], self.peer[1]
        self.buffer = []

    @property
    def connection_id(self):
        if not hasattr(self, '_connection_id'):
            self._connection_id = hashlib.md5('{}{}{}'.format(self.ip, self.port, timestamp()).encode('utf-8')).hexdigest()
        return self._connection_id

    def connection_lost(self, exception):
        del Connection.connections[self.connection_id]

    def connection_made(self, transport):
        self.transport = transport
        self.setup(transport.get_extra_info('peername'))
        Connection.connections[self.connection_id] = self

    def data_received(self, data):
        # processing, average server side execution time is around 30ms
        sleep(0.030)
        self.transport.write(os.urandom(64))

该应用程序在 Debian 9.9 上运行,并通过 systemd 启动

为了“基准测试”,我使用这个脚本:

import os, socket

from multiprocessing import Pool
from time import time as timestamp

def foobar(i):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    s.connect(('127.0.0.1', 60000))

    while True:
        ms = timestamp()*1000
        s.send(os.urandom(128))
        s.recv(1024*2)
        print(i, timestamp()*1000-ms)

if __name__ == '__main__':
    instances = 4
    with Pool(instances) as p:
        print(p.map(foobar, range(0, instances)))

标签: python-3.xpython-asyncio

解决方案


在这里回答我自己的问题。我采用了一个解决方案,该解决方案生成了多个正在监听 base_port + x 的实例,并在其前面放置了一个 nginx TCP 负载均衡器。

各个TCPServer实例仍然作为自己的进程生成,并通过单独的 UDP 连接在它们之间进行通信,并通过multiprocessing.Queue.

虽然这并不能“解决”问题,但它为我非常具体的问题提供了一种可扩展的解决方案。


推荐阅读