首页 > 解决方案 > 使用自动重试、退避和抖动对 celery 任务进行单元测试时出现 AssertionError

问题描述

使用芹菜 4.3.0。我尝试为以下任务编写单元测试。

    from django.core.exceptions import ObjectDoesNotExist

    @shared_task(autoretry_for=(ObjectDoesNotExist,), max_retries=5, retry_backoff=10)
    def process_something(data):
        product = Product()
        product.process(data)

单元测试:

@mock.patch('proj.tasks.Product')
@mock.patch('proj.tasks.process_something.retry') 
def test_process_something_retry_failed_task(self, process_something_retry, mock_product):

    mock_object = mock.MagicMock()
    mock_product.return_value = mock_object
    mock_object.process.side_effect = error = ObjectDoesNotExist()

    with pytest.raises(ObjectDoesNotExist):
        process_something(self.data)

    process_something_retry.assert_called_with(exc=error)

这是我运行测试后得到的错误:

    @wraps(task.run)
    def run(*args, **kwargs):
        try:
            return task._orig_run(*args, **kwargs)
        except autoretry_for as exc:
            if retry_backoff:
                retry_kwargs['countdown'] = \
                    get_exponential_backoff_interval(
                        factor=retry_backoff,
                        retries=task.request.retries,
                        maximum=retry_backoff_max,
                        full_jitter=retry_jitter)
>           raise task.retry(exc=exc, **retry_kwargs)
E           TypeError: exceptions must derive from BaseException

我知道这是因为例外。我ObjectDoesNotExist在所有地方都Exception替换为。运行测试后,我收到此错误:

    def assert_called_with(self, /, *args, **kwargs):
        """assert that the last call was made with the specified arguments.

            Raises an AssertionError if the args and keyword args passed in are
            different to the last call to the mock."""
        if self.call_args is None:
            expected = self._format_mock_call_signature(args, kwargs)
            actual = 'not called.'
            error_message = ('expected call not found.\nExpected: %s\nActual: %s'
                    % (expected, actual))
            raise AssertionError(error_message)

        def _error_message():
            msg = self._format_mock_failure_message(args, kwargs)
            return msg
        expected = self._call_matcher((args, kwargs))
        actual = self._call_matcher(self.call_args)
        if expected != actual:
            cause = expected if isinstance(expected, Exception) else None
>           raise AssertionError(_error_message()) from cause
E           AssertionError: expected call not found.
E           Expected: retry(exc=Exception())
E           Actual: retry(exc=Exception(), countdown=7)

请让我知道如何解决这两个错误。

标签: djangopython-3.xcelerydjango-celerycelery-task

解决方案


我在进行测试以确保 celery 重试逻辑覆盖我的特定场景时遇到了类似的问题。对我有用的是使用显式重试而不是autoretry_for参数。

我已将您的代码调整为我的解决方案。虽然我的解决方案没有使用 shared_task,但我认为它应该也可以工作。经测试celery==5.1.2

任务:

from django.core.exceptions import ObjectDoesNotExist

@shared_task(bind=True, max_retries=5, retry_backoff=10)
def process_something(self, data):
   try:
      product = Product()
      product.process(data)
   except ObjectDoesNotExist as exc:
      raise self.retry(exc=exc)

测试:

from proj.tasks import Product # I assume the Product class is located here
from django.core.exceptions import ObjectDoesNotExist
import celery

@mock.patch.object(Product, "__init__", Mock(return_value=None)) # just mocking the init method
@mock.patch.object(Product, "process")
@mock.patch('proj.tasks.process_something.retry')
def test_process_something_retry_failed_task(self, retry_mock, process_mock):
   exc = ObjectDoesNotExist()
   process_mock.side_effect = exc
   retry_mock.side_effect = celery.exceptions.Retry
   
   with pytest.raises(celery.exceptions.Retry):
      process_something(self.data)

   retry_mock.assert_called_with(exc=exc)

在我的问题中,我还使用了自定义异常。使用此解决方案,我不需要更改异常的类型。


推荐阅读