python - 在 Python 中混合同步和异步代码
问题描述
我正在尝试将基于回调的 Python 代码中的同步流转换为使用 asyncio 的 A-syncronious 流。基本上,代码与 TCP/UNIX 套接字进行了很多交互。它从套接字读取数据,对其进行操作以做出决策并将内容写回另一端。这一次在多个套接字上进行,并且有时在上下文之间共享数据以做出决策。
编辑 :: 目前的代码主要基于为特定套接字注册一个中央实体的回调,并在相关套接字可读时让该实体运行回调(类似于“当该套接字有数据要读取时调用此函数”)。一旦调用了回调 - 就会发生一堆事情,最终会注册一个新的回调以用于新数据可用时。中央实体在所有注册的套接字上运行一个选择,以确定应该调用哪些回调。
我试图在不重构我的整个代码并使这对程序员尽可能无缝的情况下做到这一点 - 所以我试图这样考虑 - 所有代码都应该像今天一样运行 - 但只要当前代码执行 socket.recv() 来获取新数据 - 该过程将执行其他任务。当读取返回时,它应该返回使用它获得的新数据从同一点处理数据。
为此,我编写了一个名为 AsyncSocket 的新类——它与 asyncIO 的 IO 流进行交互,并将 Async/await 语句几乎完全放在那里——我认为我会在我的类中实现 recv 方法以使其看起来像“常规 IO 套接字”连接到我的其余代码。到目前为止 - 这是我对异步编程应该允许的理解。
现在的问题:
我的代码等待客户端连接 - 当它连接时,每个客户端的上下文都可以从它自己的连接中读取和写入。我已经简化为以下内容以澄清问题:
class AsyncSocket():
def __init__(self,reader,writer):
self.reader = reader
self.writer = writer
def recv(self,numBytes):
print("called recv!")
data = self.read_mitigator(numBytes)
return data
async def read_mitigator(self,numBytes):
print("Awaiting of AsyncSocket.reader.read")
data = await self.reader.read(numBytes)
print("Done Awaiting of AsyncSocket.reader.read data is %s " % data)
return data
def mit2(aSock):
return mit3(aSock)
def mit3(aSock):
return aSock.recv(100)
async def echo_server(reader, writer):
print ("New Connection!")
aSock = AsyncSocket(reader,writer) # create a new A-sync socket class and pass it on the to regular code
while True:
data = await some_func(aSock) # this would eventually read from the socket
print ("Data read is %s" % (data))
if not data:
break
writer.write(data) # echo everything back
async def main(host, port):
server = await asyncio.start_server(echo_server, host, port)
await server.serve_forever()
asyncio.run(main('127.0.0.1', 5000))
mit2() 和 mit3() 是同步函数,它们在返回主客户端循环之前对返回的数据进行处理 - 但在这里我只是将它们用作空函数。当我使用 some_func() 的实现时,问题就开始了。
通过实施(编辑:工作种类)-但仍然存在问题:
def some_func(aSock):
try:
return (mit2(aSock)) # works
except:
print("Error!!!!")
虽然读取数据并对其进行处理的实现 - 例如在返回之前添加后缀,但会引发错误:
def some_func(aSock):
try:
return (mit2(aSock) + "something") # doesn't work
except:
print("Error!!!!")
该错误(据我所知)意味着它并没有真正做它应该做的事情:
New Connection!
called recv!
/Users/user/scripts/asyncServer.py:36: RuntimeWarning: coroutine 'AsyncSocket.read_mitigator' was never awaited
return (mit2(aSock) + "something") # doesn't work
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Error!!!!
Data read is None
并且回显服务器显然不起作用。显然,我的代码看起来更像选项 #2,在 some_func()、mit2() 和 mit3() 中有更多的东西——但我无法让它工作。我在使用 asyncio/async/await 方面相当新 - 那么我错过了什么(我猜是基本概念)?
解决方案
此代码无法按预期工作:
def recv(self,numBytes):
print("called recv!")
data = self.read_mitigator(numBytes)
return data
async def read_mitigator(self,numBytes):
...
您不能从同步函数调用异步函数并获取结果,您必须等待它,这样可以确保在数据尚未准备好时返回事件循环。异步和同步代码之间的这种不匹配有时被称为函数颜色问题。
由于您的代码已经在使用非阻塞套接字和事件循环,因此将其移植到 asyncio 的一个好方法可能是首先切换到 asyncio 事件循环。您可以使用事件循环方法,例如sock_recv
请求数据:
def start():
loop = asyncio.get_event_loop()
sock = make_socket() # make sure it's non-blocking
future_data = loop.sock_recv(sock, 1024)
future_data.add_done_callback(continue_read)
# return to the event loop - when some data is ready
# continue_read will be invoked
def continue_read(future):
data = future.result()
print('got', data)
# ... do something with data, e.g. process it
# and call sock_sendall with the response
asyncio.get_event_loop().call_soon(start())
asyncio.get_event_loop().run_forever()
一旦程序在该模式下运行,您就可以开始使用coroutines,它允许代码看起来像同步代码,但工作方式完全相同:
async def start():
loop = asyncio.get_event_loop()
sock = make_socket() # make sure it's non-blocking
data = await loop.sock_recv(sock, 1024)
# data is available "immediately", meaning the coroutine gets
# automatically suspended when awaiting data that is not yet
# ready, and automatically re-scheduled when the data is ready
print('got', data)
asyncio.run(start())
下一步可以消除make_socket
并切换到asyncio 流。
推荐阅读
- visual-studio - 如何将颤振预构建包重置为默认值?
- scroll - FFMPEG:如何在不拉伸的情况下移动视频?
- powershell - 在目录名称格式为 YYYY-MM-DD 的目录中获取扩展名为 .JPG 的文件
- javascript - 暂停/恢复后过渡速度加快
- c# - JWT 的默认实现
- php - 我无法在 WordPress 模板中的字符串中添加空格
- sql - 我如何将值与键作为行和列进行比较
- html - 在 CSS 中创建具有右对齐标签和左对齐值的两列布局
- java - 集成测试运行两次。一次是由于 Jacoco 的 FailSafe 插件,另一次是由于 Jbehave maven 插件
- xaml - 后退按钮后换行 - Xamarin Forms