python - 是否可以在 python 中同时运行多个 asyncio?
问题描述
基于我得到的解决方案:Running multiple sockets using asyncio in python
我尝试使用 asyncio 添加计算部分
设置:Python 3.7.4
import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()
class ThreadSafeElem(bytes):
def __init__(self, * p_arg, ** n_arg):
self._lock = threading.Lock()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, type, value, traceback):
self._lock.release()
elem = ThreadSafeElem()
async def serialize(data):
return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
return msgpack.packb(data1, use_bin_type=True)
async def process_data(data,data1):
loop = asyncio.get_event_loop()
future = await loop.run_in_executor(None, functools.partial(serialize, data))
future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
return await asyncio.gather(future,future1)
################ Calculation#############################
def calculate_data():
global elem
while True:
try:
... data is calculated (some dictionary))...
elem, elem1= asyncio.run(process_data(data, data1))
except:
pass
#####################################################################
def get_data():
return elem
def get_data1():
return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
while True:
try:
bytes_received = await reader.read(100)
package_type = np.frombuffer(bytes_received, dtype=np.int8)
if package_type ==1 :
nn_output = get_data1()
if package_type ==2 :
nn_output = get_data()
writer.write(nn_output)
await writer.drain()
except:
pass
async def start_servers(host, port):
server = await asyncio.start_server(client_thread, host, port)
await server.serve_forever()
async def start_calculate():
await asyncio.run(calculate_data())
def enable_sockets():
try:
host = '127.0.0.1'
port = 60000
sockets_number = 6
loop = asyncio.get_event_loop()
for i in range(sockets_number):
loop.create_task(start_servers(host,port+i))
loop.create_task(start_calculate())
loop.run_forever()
except:
print("weird exceptions")
##############################################################################
enable_sockets()
问题是当我从客户端拨打电话时,服务器没有给我任何东西。
我用虚拟数据测试了程序,计算部分没有异步,所以没有这个loop.create_task(start_calculate())并且服务器正确响应。
我还运行计算数据而不将其添加到启用套接字中并且它有效。它也适用于这个实现,但问题是服务器没有返回任何东西。
我这样做是因为我需要计算部分连续运行,并且当其中一个客户调用以返回该点的数据时。
解决方案
asyncio
事件循环不能嵌套在另一个事件循环中,这样做没有意义:(和asyncio.run
类似的)阻塞当前线程直到完成。这不会增加并行性,只会禁用任何外部事件循环。
如果要嵌套另一个asyncio
任务,直接在当前事件循环中运行。如果要运行非合作的阻塞任务,请在事件循环执行器中运行它。
async def start_calculate():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, calculate_data)
默认执行器使用线程——这允许运行阻塞任务,但不会增加并行度。使用自定义ProcessPoolExecutor
来使用额外的核心:
import concurrent.futures
async def start_calculate():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
await loop.run_in_executor(pool, calculate_data)
推荐阅读
- scala - Apache Livy doesn't work with local jar file
- mockito - Mockito with void method
- wordpress - Woocommerce:在存档页面上显示具有特定描述的产品变体
- angular - Can't fix ExpressionChangedAfterItHasBeenCheckedError
- json - Swift: Parse data Codable protocol not working
- vmware - 打包器:从源 ova 构建时出错
- python - Django - 重定向到子类管理页面
- oracle - Oracle中字段的自动调整
- php - Laravel Get Column Name error
- vb.net - 在 VB.net 屏幕上的任意位置使用鼠标滚动