首页 > 解决方案 > Celery 错误:即使 result.status 为 SUCCESS,result.get(timeout=5) 也会超时

问题描述

我有一个 API 可以通过启动 celery 任务result = task.delay(),然后通过result.get(timeout=5). 我目前正在编写一个性能测试,它确实经常执行此任务。它在我的本地机器上运行良好,但在我们的开发虚拟机中执行时表现出奇怪的行为。在大约 90-92result.get(timeout=5)次执行后,即使任务在几毫秒内成功,也会超时。

结果后端似乎缺少结果。我使用 RabbitMQ 作为双向消息代理:

celery_broker_url = pyamqp://guest@localhost//
celery_result_backend = rpc://

谁能给我一个关于如何进一步调查这个问题的提示?是否可以检查结果是否传递到结果后端?RabbitMQ 日志不显示任何条目:

-- Logs begin at Wed 2019-01-30 16:49:24 UTC, end at Thu 2019-01-31 14:01:46 UTC. --
-- No entries --

如果有帮助,这是完整的堆栈跟踪:

[2019-01-31 13:56:42,313] ERROR in app: Exception on /user/lmhsqs/register [POST]

Traceback (most recent call last):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 255, in _wait_for_pending

    on_interval=on_interval):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 54, in drain_events_until

    raise socket.timeout()

socket.timeout



During handling of the above exception, another exception occurred:



Traceback (most recent call last):

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in wsgi_app

    response = self.full_dispatch_request()

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in full_dispatch_request

    rv = self.handle_user_exception(e)

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in handle_user_exception

    reraise(exc_type, exc_value, tb)

  File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in reraise

    raise value

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in full_dispatch_request

    rv = self.dispatch_request()

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in dispatch_request

    return self.view_functions[rule.endpoint](**req.view_args)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/decorator.py", line 66, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/validation.py", line 122, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/validation.py", line 293, in wrapper

    return function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/decorator.py", line 42, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/parameter.py", line 219, in wrapper

    return function(**kwargs)

  File "/mynedata/lib/api/apicalls.py", line 73, in register_user

    res_to_return = result.get(timeout=5)

  File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 224, in get

    on_message=on_message,

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 188, in wait_for_pending

    for _ in self._wait_for_pending(result, **kwargs):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 259, in _wait_for_pending

    raise TimeoutError('The operation timed out.')

celery.exceptions.TimeoutError: The operation timed out.

127.0.0.1 - - [2019-01-31 13:56:42] "POST /user/lmhsqs/register HTTP/1.1" 500 388 5.050726

[2019-01-31 13:56:47,374] ERROR in app: Exception on /user/lmhsqs/login [POST]

Traceback (most recent call last):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 255, in _wait_for_pending

    on_interval=on_interval):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 54, in drain_events_until

    raise socket.timeout()

socket.timeout



During handling of the above exception, another exception occurred:



Traceback (most recent call last):

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1982, in wsgi_app

    response = self.full_dispatch_request()

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1614, in full_dispatch_request

    rv = self.handle_user_exception(e)

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1517, in handle_user_exception

    reraise(exc_type, exc_value, tb)

  File "/usr/local/lib/python3.6/dist-packages/flask/_compat.py", line 33, in reraise

    raise value

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1612, in full_dispatch_request

    rv = self.dispatch_request()

  File "/usr/local/lib/python3.6/dist-packages/flask/app.py", line 1598, in dispatch_request

    return self.view_functions[rule.endpoint](**req.view_args)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/decorator.py", line 66, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/validation.py", line 122, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/validation.py", line 293, in wrapper

    return function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/decorator.py", line 42, in wrapper

    response = function(request)

  File "/usr/local/lib/python3.6/dist-packages/connexion/decorators/parameter.py", line 219, in wrapper

    return function(**kwargs)

  File "/mynedata/lib/api/apicalls.py", line 123, in login_user

    res = result.get(timeout=5)

  File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 224, in get

    on_message=on_message,

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 188, in wait_for_pending

    for _ in self._wait_for_pending(result, **kwargs):

  File "/usr/local/lib/python3.6/dist-packages/celery/backends/async.py", line 259, in _wait_for_pending

    raise TimeoutError('The operation timed out.')

celery.exceptions.TimeoutError: The operation timed out.

标签: python-3.xrabbitmqcelery

解决方案


问题不在于 celery 或 RabbitMQ,而是完全不相关:

我开始使用os.subprocess.Popen(shlex.split(backend_cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT). 原来 subprocess.PIPE 管道会在某个时候满(我认为在 2^16 个字符之后),此时我的 celery 工作人员在尝试写入管道时被卡住了,因此停止将结果写入结果后端。这意味着我看到的超时是有效的。

我不明白为什么 result.status 在超时后显示“成功”。


推荐阅读