首页 > 解决方案 > 为 await 函数创建另一个线程

问题描述

我是第一次使用Webserver,我之前使用过套接字和并行,但它非常不同和简单,它没有使用异步作为并行。

我的目标很简单,我有我的服务器和我的客户端。在我的客户端中,我想创建一个单独的线程来接收服务器将发送的消息,并在前一个线程中执行一些其他操作,如代码示例 (client.py) 中所示:

from typing import Dict
import websockets
import asyncio
import json

URL = "my localhost webserver"
connection = None

async def listen() -> None:
    global connection

    input("Press enter to connect.")
    
    async with websockets.connect(URL) as ws:
        connection = ws

        msg_initial: Dict[str,str] = get_dict()
        await ws.send(json.dumps(msg_initial))
        

        ## This i want to be in another thread
await receive_msg()

print("I`m at listener`s thread")

# do some stuffs

async def recieve_msg() -> None:
    while True:
        msg = await connection.recv()
        print(f"Server: {msg}")

asyncio.get_event_loop().run_until_complete(listen())

让我收到一条我需要使用的消息awaitrecv()但我不知道如何为此创建一个单独的线程。我已经尝试过使用threading来创建一个单独的线程,但它没有用。

有谁知道如何做到这一点,如果有可能做到这一点?

标签: pythonpython-3.xmultithreadingasync-awaitwebserver

解决方案


目前尚不清楚您想要做什么可以按照您建议的确切方式完成。在以下示例中,我将连接到回显服务器。直接实现您所建议的最直接的方法是创建一个新线程来传递连接。但这并不完全奏效:

import websockets
import asyncio
from threading import Thread

URL = "ws://localhost:4000"

async def listen() -> None:
    async with websockets.connect(URL) as ws:
        # pass connection:
        t = Thread(target=receiver_thread, args=(ws,))
        t.start()
        # Generate some messages to be echoed back:
        await ws.send('msg1')
        await ws.send('msg2')
        await ws.send('msg3')
        await ws.send('msg4')
        await ws.send('msg5')

def receiver_thread(connection):
    print("I`m at listener`s thread")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(receive_msg(connection))

async def receive_msg(connection) -> None:
    while True:
        msg = await connection.recv()
        print(f"Server: {msg}")

asyncio.get_event_loop().run_until_complete(listen())

印刷:

I`m at listener`s thread
Server: msg1
Server: msg2
Server: msg3
Server: msg4
Server: msg5
Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Program Files\Python38\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Ron\test\test.py", line 22, in receiver_thread
    loop.run_until_complete(receive_msg(connection))
  File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "C:\Ron\test\test.py", line 29, in receive_msg
    msg = await connection.recv()
  File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
    await asyncio.wait(
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

消息接收正常,但问题出现在receiver_thread语句上的函数中:

loop.run_until_complete(receive_msg(connection))

必须启动的线程没有正在运行的事件循环,并且不能使用函数正在使用的事件循环listen,因此必须创建一个新的事件循环。如果这个线程/事件循环没有使用来自不同事件循环的任何资源(即连接),那会很好:

import websockets
import asyncio
from threading import Thread

URL = "ws://localhost:4000"

async def listen() -> None:
    async with websockets.connect(URL) as ws:
        t = Thread(target=receiver_thread)
        t.start()

def receiver_thread():
    print("I`m at listener`s thread")
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(receive_msg())

async def receive_msg() -> None:
    await asyncio.sleep(2)
    print('I just slept for 2 seconds')

asyncio.get_event_loop().run_until_complete(listen())

印刷:

I`m at listener`s thread
I just slept for 2 seconds

根据您显示的最少代码,我可以看到没有真正需要在线程中运行任何东西,但假设您省略了显示对接收到的消息的一些处理,asyncio仅此是不够的,那么您可能需要做的就是接收消息当前正在运行的循环(在函数中listen)并使用线程仅用于处理消息:

from typing import Dict
import websockets
import asyncio
import json
from threading import Thread

URL = "my localhost webserver"

async def listen() -> None:

    input("Press enter to connect.")

    async with websockets.connect(URL) as ws:

        msg_initial: Dict[str,str] = get_dict()
        await ws.send(json.dumps(msg_initial))

        while True:
            msg = await ws.recv()
            print(f"Server: {msg}")
            # Non-daemon threads so program will not end until these threads terminate:
            t = Thread(target=process_msg, args=(msg,))
            t.start()
            

asyncio.get_event_loop().run_until_complete(listen())

更新

根据您对我关于创建聊天程序的回答的最后评论,您应该使用纯多线程或纯 asyncio 来实现它。这是使用 asyncio 的粗略大纲:

import websockets
import asyncio
import aioconsole

URL = "my localhost webserver"

async def receiver(connection):
    while True:
        msg = await connection.recv()
        print(f"\nServer: {msg}")

async def sender(connection):
    while True:
        msg = await aioconsole.ainput('\nEnter msg: ')
        await connection.send(msg)

async def chat() -> None:
    async with websockets.connect(URL) as ws:
        await asyncio.gather(
            receiver(ws),
            sender(ws)
        )

asyncio.get_event_loop().run_until_complete(chat())

但是,您可以使用 asyncio 执行的用户输入类型可能会受到限制。因此,我认为多线程可能是一种更好的方法。


推荐阅读