python - 使用 Rabbitmq 挂在 Django 应用程序上的 Celery 任务
问题描述
对于在 docker 容器上运行的应用程序,我正在遵循Django 的第一步。我在一个单独的 docker 容器上设置了 rabbitmq。打开一个 python shell 来运行任务添加只会导致挂起/冻结,而没有来自 celery 界面的任何报告/错误。以下是详细信息。
Django version - 3.0.5
Celery - 5.0.2
amqp - 5.0.2
kombu - 5.0.2
Rabbitmq - 3.8.9
myapp/myapp/settings.py
CELERY_BROKER_URL = 'pyamqp://guest:guest@myhost.com//'
CELERY_RESULT_BACKEND = 'db+postgresql+psycopg2://postgres:111111@myhost.com/celery'
myapp/myapp/celery.py
import os
from celery import Celery
import logging
logger = logging.getLogger(__name__)
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
logger.error('in celery')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
myapp/myapp/init.py
from .celery import app as celery_app
import logging
logger = logging.getLogger(__name__)
logger.error('in init')
__all__ = ('celery_app',)
myapp/otherapp/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task
def add(x, y):
logger.error('add')
return x + y
在终端中运行 celery -A demolists worker --loglevel=INFO 后,我得到以下信息。
in celery
in init
/usr/lib/python3.8/site-packages/celery/platforms.py:797: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
warnings.warn(RuntimeWarning(ROOT_DISCOURAGED.format(
-------------- celery@3c389cd683b2 v5.0.2 (singularity)
--- ***** -----
-- ******* ---- Linux-4.14.186-110.268.amzn1.x86_64-x86_64-with 2020-11-29 22:12:54
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myapp:0x7f5a52f51c70
- ** ---------- .> transport: amqp://guest:**@myhost.com:5672//
- ** ---------- .> results: postgresql+psycopg2://postgres:**@myhost.com/celery
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. myapp.celery.debug_task
. otherapp.tasks.add
[2020-11-29 22:12:55,304: INFO/MainProcess] Connected to amqp://guest:**@myhost.com:5672//
[2020-11-29 22:12:55,337: INFO/MainProcess] mingle: searching for neighbors
[2020-11-29 22:12:56,388: INFO/MainProcess] mingle: all alone
[2020-11-29 22:12:56,412: WARNING/MainProcess] /usr/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-11-29 22:12:56,412: INFO/MainProcess] celery@3c389cd683b2 ready.
在一个单独的终端选项卡中,我进入 python shell 并执行以下操作。
>>> from otherapp.tasks import add
>>> result = add.delay(4, 5)
它挂在这里没有任何变化。但是,Control-C 会产生此错误。
^CTraceback (most recent call last):
File "/usr/lib/python3.8/site-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/site-packages/amqp/transport.py", line 143, in _connect
entries = socket.getaddrinfo(
File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name does not resolve
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
return fun(*args, **kwargs)
File "/usr/lib/python3.8/site-packages/kombu/connection.py", line 866, in _connection_factory
self._connection = self._establish_connection()
File "/usr/lib/python3.8/site-packages/kombu/connection.py", line 801, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/lib/python3.8/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
conn.connect()
File "/usr/lib/python3.8/site-packages/amqp/connection.py", line 322, in connect
self.transport.connect()
File "/usr/lib/python3.8/site-packages/amqp/transport.py", line 84, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/lib/python3.8/site-packages/amqp/transport.py", line 152, in _connect
raise (e
File "/usr/lib/python3.8/site-packages/amqp/transport.py", line 168, in _connect
self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.8/site-packages/celery/app/task.py", line 421, in delay
return self.apply_async(args, kwargs)
File "/usr/lib/python3.8/site-packages/celery/app/task.py", line 561, in apply_async
return app.send_task(
File "/usr/lib/python3.8/site-packages/celery/app/base.py", line 718, in send_task
amqp.send_task_message(P, name, message, **options)
File "/usr/lib/python3.8/site-packages/celery/app/amqp.py", line 523, in send_task_message
ret = producer.publish(
File "/usr/lib/python3.8/site-packages/kombu/messaging.py", line 175, in publish
return _publish(
File "/usr/lib/python3.8/site-packages/kombu/connection.py", line 525, in _ensured
return fun(*args, **kwargs)
File "/usr/lib/python3.8/site-packages/kombu/messaging.py", line 184, in _publish
channel = self.channel
File "/usr/lib/python3.8/site-packages/kombu/messaging.py", line 206, in _get_channel
channel = self._channel = channel()
File "/usr/lib/python3.8/site-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/usr/lib/python3.8/site-packages/kombu/messaging.py", line 221, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/lib/python3.8/site-packages/kombu/connection.py", line 884, in default_channel
self._ensure_connection(**conn_opts)
File "/usr/lib/python3.8/site-packages/kombu/connection.py", line 435, in _ensure_connection
return retry_over_time(
File "/usr/lib/python3.8/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
sleep(1.0)
KeyboardInterrupt
我会很感激任何帮助,一方面对正在发生的事情感到非常困惑,celery 界面表明它已连接到 rabbitmq,但 shell 中的错误消息表明存在一些问题。谢谢先进
解决方案
推荐阅读
- python - SQLAlchemy 列值基于另一个
- unity3d - Resonance Audio 中的房间效果之间的过渡?
- python - 无法使用 Python 基于 XML 中的绝对路径定位元素
- php - Symfony 3.4 路由上的动态内容(监听器?)
- angularjs - 量角器测试:错误:在页面上找不到 Angular
- python - Python-安装python包时出错(libmagic)
- html - 如何将同一数组的多次迭代推送到 Angular 中的更大数组中,以用于动态数量的选项卡?
- nginx - 如何在 NGINX (Raspberry Pi) 上部署聚合物 3.0 构建
- c# - LINQ删除重复项和结果项以获得数量总和
- mongodb - 订购 mongoDB 集合的最有效方法是什么