首页 > 解决方案 > 如何在 Django 单元测试中使用 rabbitMQ 作为 celery 的代理

问题描述

我正在编写一个集成测试,其中我正在使用创建 rabbitMQ 容器 -

docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3

为了测试我是否可以从测试中连接到 rabbitMQ,我创建了这个测试,它可以发送数据 -

def test_rmq(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
        channel = connection.channel()

        channel.queue_declare(queue='hello')
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body='Hello World!')
        print(" [x] Sent 'Hello World!'")
        connection.close()

现在我想使用 rabbitMQ 容器作为 celery 后端,这是我正在使用的代码 -

from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')

from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks


# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.celery_worker = start_worker(app)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()
        cls.celery_worker.__exit__(None, None, None)

    def setUp(self):
        super().setUp()
        self.task = tasks.delete_all_service_accounts_when_user_inactive()
        fake_obj_meta_del = V1ObjectMeta(self_link="deleted_service_account_link")
        self.delete_namespaced_service_account_fake_data = V1Status(metadata=fake_obj_meta_del)
        self.results = self.task.get()

    @patch('kubernetes.client.CoreV1Api.delete_namespaced_service_account')
    @patch('app.k8s.serviceaccount.get_inactive_serviceaccounts')
    def test_delete_all_service_accounts_when_user_inactive(self, k8s_get_inactive_patch, k8s_del_sa_patch):
        k8s_get_inactive_patch.return_value = ["sf-user-1", "sf-user-2"]
        k8s_del_sa_patch.return_value = self.delete_namespaced_service_account_fake_data
        assert self.task.state == "SUCCESS"

当我执行测试时,我发现了这个错误——

Creating test database for alias 'default'...
System check identified no issues (0 silenced).
...... [x] Sent 'Hello World!'
.E
======================================================================
ERROR: setUpClass (tests.test_service_accounts.ServiceAccountCeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/mlokur/swordfish/src/tests/test_service_accounts.py", line 124, in setUpClass
    cls.celery_worker.__enter__()
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 78, in start_worker
    **kwargs) as worker:
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 103, in _start_worker_thread
    assert 'celery.ping' in app.tasks
AssertionError

----------------------------------------------------------------------
Ran 7 tests in 1.842s

FAILED (errors=1)
Destroying test database for alias 'default'...

我写了一个单独的python文件来连接和检查 -

from celery import Celery
import urllib.request
import os

# Where the downloaded files will be stored
BASEDIR="/home/celery/downloadedFiles"

# Create the app and set the broker location (RabbitMQ)

broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')

@app.task
def download(url, filename):
    """
    Download a page and save it to the BASEDIR directory
      url: the url to download
      filename: the filename used to save the url in BASEDIR
    """
    response = urllib.request.urlopen(url)
    data = response.read()
    with open("snap",'wb') as file:
        file.write(data)
    file.close()

这个示例芹菜任务正确连接到rabbitMQ然后工作。任何帮助表示赞赏。谢谢。

标签: pythondjangorabbitmqceleryrabbitmq-exchange

解决方案


只需禁用 ping 检查

start_worker(app, perform_ping_check=False)

否则它会寻找任务celery.ping,很可能你没有注册这样的任务。

但我还是建议添加它

@shared_task(name='celery.ping')
def ping():
    return 'pong'

因为它可以让你对你的芹菜工人进行健康检查。


推荐阅读