首页 > 解决方案 > 在 Python FastAPI 中使用 websocket 并行发送/接收

问题描述

我将尝试用一个例子来解释我在做什么,比如我正在构建一个天气客户端。浏览器通过 websocket 发送消息,例如:

{
  "city": "Chicago",
  "country": "US"
}

服务器每 5 分钟查询一次天气,并使用最新数据更新浏览器。

现在浏览器可以发送另一条消息,例如:

{
  "city": "Bangalore",
  "country": "IN"
}

现在我的服务器应该停止更新芝加哥的天气详细信息并开始更新有关班加罗尔的详细信息,即同时通过 websocket 发送/接收消息。我应该如何实施呢?

目前我有这个,但这只会在接收到事件时更新浏览器:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    weather_client = WeatherClient(client)
    while True:
        data = await websocket.receive_json()
        weather = await weather_client.weather(data)
        await websocket.send_json(weather.dict())

如果我移出websocket.receive_json()循环,我将无法连续收听来自浏览器的消息。我想我需要启动两个 asyncio 任务,但由于我是异步编程方式的新手,所以我不太能够确定实现。

标签: pythonwebsocketpython-asynciofastapi

解决方案


执行此操作的最简单方法就像您提到在单独的任务中将阅读移到循环之外。在这个范例中,您需要使用最新数据更新局部变量,使您的代码看起来像这样:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    json_data = await websocket.receive_json()

    async def read_from_socket(websocket: WebSocket):
        nonlocal json_data
        async for data in websocket.iter_json():
            json_data = data

    asyncio.create_task(read_from_socket(websocket))
    while True:
        print(f"getting weather data for {json_data}")
        await asyncio.sleep(1)  # simulate a slow call to the weather service

注意我使用了iter_json异步生成器,这相当于一个无限循环receive_json

这将起作用,但根据您的要求可能会有错误。想象一下,天气服务需要 10 秒才能完成,在这段时间内,用户通过套接字向不同城市发送了三个请求。在上面的代码中,您只会获得用户发送的最新城市。这对您的应用程序可能没问题,但如果您需要跟踪用户发送的所有内容,则需要使用队列。在此范例中,您将有一项任务读取数据并将其放入队列,一项任务从队列中获取数据并查询天气服务。然后,您将与gather.

@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        while True:
            if queue.empty():
                print(f"getting weather data for {data}")
                await asyncio.sleep(1)
            else:
                data = queue.get_nowait()
                print(f"Setting data to {data}")

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())

这样,您就不会丢失用户发送的数据。在上面的示例中,我只获取用户请求的最新天气数据,但您仍然可以访问所有发送的数据。

编辑:要在评论中回答您的问题,队列方法可能最好在新请求进入时取消任务。基本上将您希望能够取消的长时间运行的任务转移到它自己的协程函数中(在本例中read_and_send_to_client)和将其作为任务运行。当新数据进来时,如果该任务没有完成,取消它然后创建一个新的。

async def read_and_send_to_client(data):
    print(f'reading {data} from client')
    await asyncio.sleep(10) # simulate a slow call
    print(f'finished reading {data}, sending to websocket client')


@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        fetch_task = asyncio.create_task(read_and_send_to_client(data))
        while True:
            data = await queue.get()
            if not fetch_task.done():
                print(f'Got new data while task not complete, canceling.')
                fetch_task.cancel()
            fetch_task = asyncio.create_task(read_and_send_to_client(data))

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())

推荐阅读