首页 > 解决方案 > 是否可以在 python 中同时运行多个 asyncio?

问题描述

基于我得到的解决方案:Running multiple sockets using asyncio in python

我尝试使用 asyncio 添加计算部分

设置:Python 3.7.4

import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()

class ThreadSafeElem(bytes):
  def __init__(self, * p_arg, ** n_arg):
     self._lock = threading.Lock()
  def __enter__(self):
     self._lock.acquire()
     return self
  def __exit__(self, type, value, traceback):
     self._lock.release()

elem = ThreadSafeElem()

async def serialize(data):
   return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
   return msgpack.packb(data1, use_bin_type=True)

async def process_data(data,data1):
   loop = asyncio.get_event_loop()
   future = await loop.run_in_executor(None, functools.partial(serialize, data))
   future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
   return   await asyncio.gather(future,future1)

 ################ Calculation#############################
def calculate_data():
  global elem
  while True:
      try:
          ... data is calculated (some dictionary))...
          elem, elem1= asyncio.run(process_data(data, data1))
      except:
          pass
#####################################################################
def get_data():
  return elem
def get_data1():
  return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
  while True:
    try:
        bytes_received = await reader.read(100) 
        package_type = np.frombuffer(bytes_received, dtype=np.int8)
        if package_type ==1 :
           nn_output = get_data1()
        if package_type ==2 :
           nn_output = get_data()               
        writer.write(nn_output)
        await writer.drain()
    except:
        pass

async def start_servers(host, port):
  server = await asyncio.start_server(client_thread, host, port)
  await server.serve_forever()

async def start_calculate():
  await asyncio.run(calculate_data())

def enable_sockets():
 try:
    host = '127.0.0.1'
    port = 60000
    sockets_number = 6
    loop = asyncio.get_event_loop()
    for i in range(sockets_number):
        loop.create_task(start_servers(host,port+i))
    loop.create_task(start_calculate())
    loop.run_forever()
except:
    print("weird exceptions")
##############################################################################

enable_sockets()   

问题是当我从客户端拨打电话时,服务器没有给我任何东西。

我用虚拟数据测试了程序,计算部分没有异步,所以没有这个loop.create_task(start_calculate())并且服务器正确响应。

我还运行计算数据而不将其添加到启用套接字中并且它有效。它也适用于这个实现,但问题是服务器没有返回任何东西。

我这样做是因为我需要计算部分连续运行,并且当其中一个客户调用以返回该点的数据时。

标签: pythonpython-3.xserverpython-asyncio

解决方案


asyncio事件循环不能嵌套在另一个事件循环中,这样做没有意义:(和asyncio.run类似的)阻塞当前线程直到完成。这不会增加并行性,只会禁用任何外部事件循环。

如果要嵌套另一个asyncio任务,直接在当前事件循环中运行。如果要运行非合作的阻塞任务,请在事件循环执行器中运行它

async def start_calculate():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, calculate_data)

默认执行器使用线程——这允许运行阻塞任务,但不会增加并行度。使用自定义ProcessPoolExecutor来使用额外的核心:

import concurrent.futures

async def start_calculate():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        await loop.run_in_executor(pool, calculate_data)

推荐阅读