python - 如何在 Django 单元测试中使用 rabbitMQ 作为 celery 的代理
问题描述
我正在编写一个集成测试,其中我正在使用创建 rabbitMQ 容器 -
docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3
为了测试我是否可以从测试中连接到 rabbitMQ,我创建了这个测试,它可以发送数据 -
def test_rmq(self):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
现在我想使用 rabbitMQ 容器作为 celery 后端,这是我正在使用的代码 -
from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')
from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks
# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.celery_worker = start_worker(app)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
cls.celery_worker.__exit__(None, None, None)
def setUp(self):
super().setUp()
self.task = tasks.delete_all_service_accounts_when_user_inactive()
fake_obj_meta_del = V1ObjectMeta(self_link="deleted_service_account_link")
self.delete_namespaced_service_account_fake_data = V1Status(metadata=fake_obj_meta_del)
self.results = self.task.get()
@patch('kubernetes.client.CoreV1Api.delete_namespaced_service_account')
@patch('app.k8s.serviceaccount.get_inactive_serviceaccounts')
def test_delete_all_service_accounts_when_user_inactive(self, k8s_get_inactive_patch, k8s_del_sa_patch):
k8s_get_inactive_patch.return_value = ["sf-user-1", "sf-user-2"]
k8s_del_sa_patch.return_value = self.delete_namespaced_service_account_fake_data
assert self.task.state == "SUCCESS"
当我执行测试时,我发现了这个错误——
Creating test database for alias 'default'...
System check identified no issues (0 silenced).
...... [x] Sent 'Hello World!'
.E
======================================================================
ERROR: setUpClass (tests.test_service_accounts.ServiceAccountCeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/mlokur/swordfish/src/tests/test_service_accounts.py", line 124, in setUpClass
cls.celery_worker.__enter__()
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 78, in start_worker
**kwargs) as worker:
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 103, in _start_worker_thread
assert 'celery.ping' in app.tasks
AssertionError
----------------------------------------------------------------------
Ran 7 tests in 1.842s
FAILED (errors=1)
Destroying test database for alias 'default'...
我写了一个单独的python文件来连接和检查 -
from celery import Celery
import urllib.request
import os
# Where the downloaded files will be stored
BASEDIR="/home/celery/downloadedFiles"
# Create the app and set the broker location (RabbitMQ)
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')
@app.task
def download(url, filename):
"""
Download a page and save it to the BASEDIR directory
url: the url to download
filename: the filename used to save the url in BASEDIR
"""
response = urllib.request.urlopen(url)
data = response.read()
with open("snap",'wb') as file:
file.write(data)
file.close()
这个示例芹菜任务正确连接到rabbitMQ然后工作。任何帮助表示赞赏。谢谢。
解决方案
只需禁用 ping 检查
start_worker(app, perform_ping_check=False)
否则它会寻找任务celery.ping
,很可能你没有注册这样的任务。
但我还是建议添加它
@shared_task(name='celery.ping')
def ping():
return 'pong'
因为它可以让你对你的芹菜工人进行健康检查。
推荐阅读
- python - 为什么opencv和枕头将png图像转换为差异值范围
- sql - 如何使用 Postgres 添加指向 2 个表之一的外键?
- linux - Postgres 扩展不可用 - CentOS 8
- python - 如何使提升的小部件在 Qt Designer 中显示得更大
- r - 进一步按年份将绘图数据缩放为间隔
- python - TypeError:“NoneType”对象不可下标(PYTHON)
- c# - 无法为 WSDL 文件创建 Web 引用
- javascript - Javascript HasOwnProperty Polyfill
- python - 如何为大型图像数据集使用更少的内存?(Python - Keras)
- php - 如何使用数组中的值重新格式化 JSON 响应