首页 > 解决方案 > Python asyncio:RuntimeError 在当前事件循环以外的事件循环上调用的非线程安全操作

问题描述

我已经设置了一个 Python 程序,该程序依赖于asynciosocketio使用两个不同的聊天系统在聊天参与者之间传输消息(一个接口是 CRM 后端,另一个是网站上的 javascript 聊天小部件)。

代码的相关部分如下。我需要为每个独立聊天打开一个单独的线程。因此,为了打开一个到后端的通道,我创建了一个单独的线程并在其中运行一个“long_polling”函数,该函数不断检查来自后端的新消息,pull_messages()并通过异步将它们发送到小部件socketio AsyncServer(从小部件转发消息后端工作正常,我把它放在这里):

thread = threading.Thread(target=long_poll, args=())
thread.daemon = True
thread.start()

long_poll()函数定义为

self.server = AsyncServer(async_mode="sanic")

async def send_aio(self, msg):
    await self.server.emit(msg)

def long_poll(self):
    while chat:
        response = self.pull_messages(...)
        if response == "/end":
            chat = False

        asyncio.set_event_loop(asyncio.new_event_loop())
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.send_aio(response))
        loop.close()

执行 long_poll 函数时,在获取人工响应之后,我收到以下错误:

ERROR    asyncio  - Task exception was never retrieved
future: <Task finished coro=<AsyncServer._emit_internal() done, defined at /usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py:344> exception=RuntimeError('Non-thread-safe operation invoked on an event loop other than the current one',)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 354, in _emit_internal
    binary=None))
  File "/usr/local/lib/python3.6/site-packages/socketio/asyncio_server.py", line 365, in _send_packet
    await self.eio.send(sid, encoded_packet, binary=False)
  File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_server.py", line 89, in send
    binary=binary))
  File "/usr/local/lib/python3.6/site-packages/engineio/asyncio_socket.py", line 74, in send
    await self.queue.put(pkt)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 141, in put
    return self.put_nowait(item)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 153, in put_nowait
    self._wakeup_next(self._getters)
  File "/usr/local/lib/python3.6/asyncio/queues.py", line 74, in _wakeup_next
    waiter.set_result(None)
  File "uvloop/loop.pyx", line 1251, in uvloop.loop.Loop.call_soon
  File "uvloop/loop.pyx", line 644, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

奇怪的是,当我通过 docker 在本地测试时,这个错误不会发生,只有在我迁移到 Kubernetes 时才会发生(图像和所有其他设置都相同)。

到目前为止我尝试分析的情况:在错误发生之前放入函数中logger.info("Using thread: {}".format(threading.current_thread().name))long_poll()它告诉我代码在其中运行Thread-n(n 在本地始终为 1,通常在 Kubernetes Pod 中为更高的整数),这MainThread与我的代码的其他部分使用 asyncio 运行的地方不同,因此我认为我是安全的(我知道上述asyncio 代码不是线程安全的)。如果您有任何建议,请告诉我。


更新:

正如 user4815162342 所建议的那样,我没有在 while 循环中创建和销毁事件循环,而是在专用线程中创建了一个事件循环,然后将我需要运行的协程通过asyncio.run_coroutine_threadsafe(). 不确定我是否在这里做的一切都正确,但现在这部分代码阻塞了程序的其余部分(例如,不再有从用户到后端的消息......)

self.server = AsyncServer(async_mode="sanic")

async def send_aio(self, msg):
    await self.server.emit(msg)

def long_poll(self, loop):
    while chat:
        response = self.pull_messages(...)
        if response == "/end":
            chat = False

        future = asyncio.run_coroutine_threadsafe(self.send_io(response), loop)
        _ = future.result()

In the main program:
...
                loop = asyncio.new_event_loop()
                lpt = LongPollingObject()
                threading.Thread(target=lpt.long_poll, args=(loop,), daemon=True).start()
...

标签: pythondockerkubernetespython-asyncio

解决方案


推荐阅读