python-3.x - python Websockets (asyncio ver) 强制关闭连接
问题描述
我正在为python> 3.5编码。我正在使用这里的 Websockets 6.0 库:
https ://github.com/aaugustin/websockets
我一直称它们为asyncio
Websockets,因为它们基于 asyncio。在我的搜索中有很多“丢失的连接”,但我正在研究如何取消当前的ws.recv()
. 调用.start()
创建一个辅助线程来启动异步事件循环。然后接收函数启动并调用连接函数并ws
实例化websocket。然后接收功能工作下降消息。当我准备停止时,会调用 .stop() 。我期待停止功能停止等待ws.recv()
。然后将 keep_running 标志设置为 false 并运行 a ws.close()
,我希望ws.recv()
结束和当 keep_running 循环结束。这不是正在发生的事情。我看到所有三个站点,但从来没有receive stop
。
command is: stop
Do command is stopped
Stop 1
Stop 2
Stop 3
^CException ignored in: <module 'threading' from '/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py'>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1294, in _shutdown
t.join()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
(pyalmondplus) Pauls-MBP:pyalmondplus paulenright$
参考代码:
import threading
import asyncio
import websockets
import json
class PyAlmondPlus:
def __init__(self, api_url, event_callback=None):
self.api_url = api_url
self.ws = None
self.loop = asyncio.get_event_loop()
self.receive_task = None
self.event_callback = event_callback
self.keep_running = False
async def connect(self):
print("connecting")
if self.ws is None:
print("opening socket")
self.ws = await websockets.connect(self.api_url)
print(self.ws)
async def disconnect(self):
pass
async def send(self, message):
pass
async def receive(self):
print("receive started")
while self.keep_running:
if self.ws is None:
await self.connect()
recv_data = await self.ws.recv()
print(recv_data)
print("receive ended")
def start(self):
self.keep_running = True
print("Start 1")
print("Start 2")
t = threading.Thread(target=self.start_loop, args=())
print("Start 3")
t.start()
print("Receiver running")
def start_loop(self):
print("Loop helper 1")
policy = asyncio.get_event_loop_policy()
policy.set_event_loop(policy.new_event_loop())
self.loop = asyncio.get_event_loop()
self.loop.set_debug(True)
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.receive())
print("Loop helper 2")
def stop(self):
print("Stop 1")
self.keep_running = False
print("Stop 2")
self.ws.close()
print("Stop 3")
解决方案
我正在研究如何取消当前的 ws.recv() [...] 我看到所有三个停止,但从来没有接收停止。
您的receive
协程可能会暂停等待一些数据到达,因此它无法检查keep_running
标志。
停止正在运行的协程的简单而可靠的方法是使用驱动它cancel
的 asyncio Task
。这将立即取消挂起协程并使其等待的任何内容引发CancelledError
. 使用取消时,您根本不需要keep_running
标志,异常将自动终止循环。
对 .start() 的调用会创建一个辅助线程来启动异步事件循环。
这行得通,但您并不需要为每个PyAlmondPlus
. Asyncio 设计为在单个线程内运行,因此一个事件循环实例可以托管任意数量的协程。
这是实现这两种想法的可能设计(未使用实际的 Web 套接字进行测试):
# pre-start a single thread that runs the asyncio event loop
bgloop = asyncio.new_event_loop()
_thread = threading.Thread(target=bgloop.run_forever)
_thread.daemon = True
_thread.start()
class PyAlmondPlus:
def __init__(self, api_url):
self.api_url = api_url
self.ws = None
async def connect(self):
if self.ws is None:
self.ws = await websockets.connect(self.api_url)
async def receive(self):
# keep_running is not needed - cancel the task instead
while True:
if self.ws is None:
await self.connect()
recv_data = await self.ws.recv()
async def init_receive_task(self):
self.receive_task = bgloop.create_task(self.receive())
def start(self):
# use run_coroutine_threadsafe to safely submit a coroutine
# to the event loop running in a different thread
init_done = asyncio.run_coroutine_threadsafe(
self.init_receive_task(), bgloop)
# wait for the init coroutine to actually finish
init_done.result()
def stop(self):
# Cancel the running task. Since the event loop is in a
# background thread, request cancellation with
# call_soon_threadsafe.
bgloop.call_soon_threadsafe(self.receive_task.cancel)
推荐阅读
- r - 按单列和多因素过滤数据集
- java - 在java中使用正则表达式替换映射字符
- angular - onChange 事件不会与 FormControl 和 ControlValueAccessor 一起触发(角度 6)
- apache-spark - monotonically_increasing_id 正在为 spark 2.3.1 中的同一记录生成 2 个不同的唯一 ID?
- c# - 如何读取多个表并根据用户的角色将用户重定向到不同的页面
- javascript - jQuery中拖放示例的几个放置区域
- apache - 提取 Apache 环境变量“REMOTE_USER”仅适用于 SSL
- angular - 样式化角度 2 工具提示
- highcharts - Chrome 不会在 Highcharts 中显示 x 轴标签,Safari 和 Firefox 会这样做
- testing - UFT/QTP System.InvalidCastException 错误