首页 > 解决方案 > Websocket订阅和协同程序:在不退出循环的情况下提取消息?

问题描述

我正在尝试组织我对 API 的实时订阅,这就是我目前所拥有的:

async def real_time_info():
    channels = ['channel1']
    msg_subscription = msg.public_subscribe(channels)
    async with websockets.connect(URL) as websocket:
        await websocket.send(json.dumps(msg_subscription))
        while websocket.open:
            subscription = json.loads(await websocket.recv())
            return subscription #HERE IS THE PROBLEM I'M TRYING TO SOLVE


async def real_time_data():
    channels = ['channel2']
    msg_subscription = msg.public_subscribe(channels)
    async with websockets.connect(URL) as websocket:
        await websocket.send(json.dumps(msg_subscription))
        while websocket.open:
            subscription = json.loads(await websocket.recv())
            return subscription #HERE IS THE PROBLEM I'M TRYING TO SOLVE


async def main():
    info = asyncio.create_task(real_time_info())
    data = asyncio.create_task(real_time_data())

    while True:
        print(await info)
        print(await data)

asyncio.run(main())

所以我想做的是每次有来自real_time_info()or的消息时real_time_data(),打印它。问题是因为我使用 return 它会自动退出该函数,当然也会停止从 websocket 获取消息。

从技术上讲,我可以通过将所有内容放在一个函数中并在其中完成所有内容而无需退出 for 循环来实现这一点,但是这样一来一团糟,而且很难管理我从套接字获取的消息。

有没有办法将这些套接字消息从我的real_time函数中获取到 main,而它们仍然在循环中运行?

谢谢!

编辑:我还会问另一个关于 websockets 的问题,这些问题可能会使这一切变得微不足道。现在,我通过 2 个不同的函数连接到同一 API 的 2 个通道(通道 1 和通道 2)。但是因为在他们两个中我都尝试通过 读取来自服务器的消息await websocket.recv(),这些消息是否仍然混合在同一个“接收器”中?如果是这样,有没有办法通过 websockets 管理来自服务器的响应?再次感谢。

标签: pythonwebsocketpython-asyncio

解决方案


将 的实例传递asyncio.Queue()给您的异步函数并替换return subscriptionawait queue.put(('channel1', subscription)). 然后你main可以看起来像这样:

async def main():
    queue = asyncio.Queue()
    info = asyncio.create_task(quote_info(queue))
    data = asyncio.create_task(trades_info(queue))

    while True:
        channel, subscription = await queue.get()
        print(channel, subscription)

推荐阅读