首页 > 解决方案 > 在 Python 中混合同步和异步代码

问题描述

我正在尝试将基于回调的 Python 代码中的同步流转换为使用 asyncio 的 A-syncronious 流。基本上,代码与 TCP/UNIX 套接字进行了很多交互。它从套接字读取数据,对其进行操作以做出决策并将内容写回另一端。这一次在多个套接字上进行,并且有时在上下文之间共享数据以做出决策。

编辑 :: 目前的代码主要基于为特定套接字注册一个中央实体的回调,并在相关套接字可读时让该实体运行回调(类似于“当该套接字有数据要读取时调用此函数”)。一旦调用了回调 - 就会发生一堆事情,最终会注册一个新的回调以用于新数据可用时。中央实体在所有注册的套接字上运行一个选择,以确定应该调用哪些回调。

我试图在不重构我的整个代码并使这对程序员尽可能无缝的情况下做到这一点 - 所以我试图这样考虑 - 所有代码都应该像今天一样运行 - 但只要当前代码执行 socket.recv() 来获取新数据 - 该过程将执行其他任务。当读取返回时,它应该返回使用它获得的新数据从同一点处理数据。

为此,我编写了一个名为 AsyncSocket 的新类——它与 asyncIO 的 IO 流进行交互,并将 Async/await 语句几乎完全放在那里——我认为我会在我的类中实现 recv 方法以使其看起来像“常规 IO 套接字”连接到我的其余代码。到目前为止 - 这是我对异步编程应该允许的理解。

现在的问题:

我的代码等待客户端连接 - 当它连接时,每个客户端的上下文都可以从它自己的连接中读取和写入。我已经简化为以下内容以澄清问题:

class AsyncSocket():
    def __init__(self,reader,writer):
        self.reader = reader
        self.writer = writer
    def recv(self,numBytes):
        print("called recv!")
        data = self.read_mitigator(numBytes)
        return data
    async def read_mitigator(self,numBytes):
        print("Awaiting of AsyncSocket.reader.read")
        data = await self.reader.read(numBytes)
        print("Done Awaiting of AsyncSocket.reader.read data is %s " % data)
        return data 

def mit2(aSock):
    return mit3(aSock)

def mit3(aSock):
    return aSock.recv(100)

async def echo_server(reader, writer):
    print ("New Connection!")
    aSock = AsyncSocket(reader,writer) # create a new A-sync socket class and pass it on the to regular code

    while True:
        data = await some_func(aSock) # this would eventually read from the socket
        print ("Data read is %s" % (data))
        if not data:
            break
        writer.write(data) # echo everything back

async def main(host, port):
    server = await asyncio.start_server(echo_server, host, port)
    await server.serve_forever()
asyncio.run(main('127.0.0.1', 5000))

mit2() 和 mit3() 是同步函数,它们在返回主客户端循环之前对返回的数据进行处理 - 但在这里我只是将它们用作空函数。当我使用 some_func() 的实现时,问题就开始了。

通过实施(编辑:工作种类)-但仍然存在问题:

def some_func(aSock):
    try:
        return (mit2(aSock)) # works
    except:
        print("Error!!!!")

虽然读取数据并对其进行处理的实现 - 例如在返回之前添加后缀,但会引发错误:

def some_func(aSock):
    try:
        return (mit2(aSock) + "something") # doesn't work
    except:
        print("Error!!!!")

该错误(据我所知)意味着它并没有真正做它应该做的事情:

New Connection!
called recv!
/Users/user/scripts/asyncServer.py:36: RuntimeWarning: coroutine 'AsyncSocket.read_mitigator' was never awaited
  return (mit2(aSock) + "something") # doesn't work
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Error!!!!
Data read is None

并且回显服务器显然不起作用。显然,我的代码看起来更像选项 #2,在 some_func()、mit2() 和 mit3() 中有更多的东西——但我无法让它工作。我在使用 asyncio/async/await 方面相当新 - 那么我错过了什么(我猜是基本概念)?

标签: pythonpython-3.xasync-awaitpython-asyncio

解决方案


此代码无法按预期工作:

def recv(self,numBytes):
    print("called recv!")
    data = self.read_mitigator(numBytes)
    return data

async def read_mitigator(self,numBytes):
    ...

您不能从同步函数调用异步函数并获取结果,您必须等待它,这样可以确保在数据尚未准备好时返回事件循环。异步和同步代码之间的这种不匹配有时被称为函数颜色问题。

由于您的代码已经在使用非阻塞套接字和事件循环,因此将其移植到 asyncio 的一个好方法可能是首先切换到 asyncio 事件循环。您可以使用事件循环方法,例如sock_recv请求数据:

def start():
    loop = asyncio.get_event_loop()
    sock = make_socket()  # make sure it's non-blocking
    future_data = loop.sock_recv(sock, 1024)
    future_data.add_done_callback(continue_read)
    # return to the event loop - when some data is ready
    # continue_read will be invoked

def continue_read(future):
    data = future.result()
    print('got', data)
    # ... do something with data, e.g. process it
    # and call sock_sendall with the response

asyncio.get_event_loop().call_soon(start())
asyncio.get_event_loop().run_forever()

一旦程序在该模式下运行,您就可以开始使用coroutines,它允许代码看起来像同步代码,但工作方式完全相同:

async def start():
    loop = asyncio.get_event_loop()
    sock = make_socket()  # make sure it's non-blocking
    data = await loop.sock_recv(sock, 1024)
    # data is available "immediately", meaning the coroutine gets
    # automatically suspended when awaiting data that is not yet
    # ready, and automatically re-scheduled when the data is ready
    print('got', data)

asyncio.run(start())

下一步可以消除make_socket并切换到asyncio 流


推荐阅读