python - Python asyncio:RuntimeError 在当前事件循环以外的事件循环上调用的非线程安全操作
问题描述
我已经设置了一个 Python 程序,该程序依赖于asyncio
并socketio
使用两个不同的聊天系统在聊天参与者之间传输消息(一个接口是 CRM 后端,另一个是网站上的 javascript 聊天小部件)。
代码的相关部分如下。我需要为每个独立聊天打开一个单独的线程。因此,为了打开一个到后端的通道,我创建了一个单独的线程并在其中运行一个“long_polling”函数,该函数不断检查来自后端的新消息,pull_messages()
并通过异步将它们发送到小部件socketio AsyncServer
(从小部件转发消息后端工作正常,我把它放在这里):
thread = threading.Thread(target=long_poll, args=())
thread.daemon = True
thread.start()
long_poll()
函数定义为
self.server = AsyncServer(async_mode="sanic")
async def send_aio(self, msg):
await self.server.emit(msg)
def long_poll(self):
while chat:
response = self.pull_messages(...)
if response == "/end":
chat = False
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
loop.run_until_complete(self.send_aio(response))
loop.close()
执行 long_poll 函数时,在获取人工响应之后,我收到以下错误:
ERROR asyncio - Task exception was never retrieved
future: <Task finished coro=<AsyncServer._emit_internal() done, defined at /usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py:344> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one',)>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 354, in _emit_internal
binary=None))
File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 365, in _send_packet
await self.eio.send(sid, encoded_packet, binary=False)
File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_server.py", line 89, in send
binary=binary))
File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_socket.py", line 74, in send
await self.queue.put(pkt)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 141, in put
return self.put_nowait(item)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 153, in put_nowait
self._wakeup_next(self._getters)
File "/usr/local/lib/python3.6/asyncio/queues.py", line 74, in _wakeup_next
waiter.set_result(None)
File "uvloop/loop.pyx", line 1251, in uvloop.loop.Loop.call_soon
File "uvloop/loop.pyx", line 644, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
奇怪的是,当我通过 docker 在本地测试时,这个错误不会发生,只有在我迁移到 Kubernetes 时才会发生(图像和所有其他设置都相同)。
到目前为止我尝试分析的情况:在错误发生之前放入函数中logger.info("Using thread: {}".format(threading.current_thread().name))
。long_poll()
它告诉我代码在其中运行Thread-n
(n 在本地始终为 1,通常在 Kubernetes Pod 中为更高的整数),这MainThread
与我的代码的其他部分使用 asyncio 运行的地方不同,因此我认为我是安全的(我知道上述asyncio 代码不是线程安全的)。如果您有任何建议,请告诉我。
更新:
正如 user4815162342 所建议的那样,我没有在 while 循环中创建和销毁事件循环,而是在专用线程中创建了一个事件循环,然后将我需要运行的协程通过asyncio.run_coroutine_threadsafe()
. 不确定我是否在这里做的一切都正确,但现在这部分代码阻塞了程序的其余部分(例如,不再有从用户到后端的消息......)
self.server = AsyncServer(async_mode="sanic")
async def send_aio(self, msg):
await self.server.emit(msg)
def long_poll(self, loop):
while chat:
response = self.pull_messages(...)
if response == "/end":
chat = False
future = asyncio.run_coroutine_threadsafe(self.send_io(response), loop)
_ = future.result()
In the main program:
...
loop = asyncio.new_event_loop()
lpt = LongPollingObject()
threading.Thread(target=lpt.long_poll, args=(loop,), daemon=True).start()
...
解决方案
推荐阅读
- vue.js - 无法在 VueJS 中使用 axios 在标头中设置授权和令牌
- r - 集成 R 数据框并根据列值执行操作
- javascript - 使用 async / await 调用挂在 vue 组件中
- amazon-ec2 - 将 SSM 参数设置为 EC2 的环境变量 - 不起作用
- swift - 强制 NSStackView 拥抱内容
- node.js - 我在我的 nextJS 项目中使用“next-connect”时遇到了一个奇怪的错误,该项目已在 dev 中修复,但现在在 prod 中更糟
- dictionary - 为什么我会得到一个错误类型,其参数为同一目的使用了两次
- curl - 来自 NAT 网关后面的 Google Analytics API 调用被机器人过滤器过滤
- asp.net - 从 aspx 文件中运行 .NET 程序
- html - 以一个 SVG 图层为目标时,使用 css 动画折叠 SVG 信封的行为很奇怪