首页 > 解决方案 > 通过 Celery 任务调用 DRF ViewSet

问题描述

我有一个 Django Rest 框架视图集:

MyModelViewSet(generics.RetrieveUpdateDestroyAPIView):
    def perform_destroy(self, instance):
        # do something besides deleting the object

现在我正在编写一个 Celery 定期任务,它根据过滤器(比如说end_date < now)删除过期的对象。

我希望任务重用并执行在 ViewSet 的perform_destroy方法中执行的相同操作。

这可以做到吗?如何?
谢谢!

标签: django-rest-frameworkdjango-celerycelery-task

解决方案


您可以通过在 DRF 中使用Request来解决您的问题,安排一个 celery 任务来调用该请求。效果很好,我之前已经实现过。

示例代码:

from rest_framework.request import Request as DRFRequest
from django.conf import settings
from django.http import HttpRequest
from your_module.views import MyModelViewSet

CELERY_CACHING_QUEUE = getattr(settings, "CELERY_CACHING_QUEUE", None)


def delete_resource(resource_pk: int) -> None:
    """
    This method helps to delete the resource by the id.
    """
    print(f'Starting deleting resource {resource_pk}...')

    request = HttpRequest()
    request.method = 'DELETE'
    request.META = {
        'SERVER_NAME': settings.ALLOWED_HOSTS[0],
        'SERVER_PORT': 443
    }
    drf_request = DRFRequest(request)

    # If your API need user has access permission,
    # you should handle for getting the value of
    # user_has_access_permission before
    # E.g. below
    # drf_request.user = user_has_access_permission

    try:
        view = MyModelViewSet(
            kwargs={
                'pk': resource_pk
            },
            request=drf_request
        )
        view.initial(drf_request)
        view.delete(drf_request)
    except (Exception, KeyError) as e:
        print(f'Cannot delete resource: {resource_pk}, error: {e}')
        return

    print(f'Finished deleting resource {resource_pk}...')


@task(name="delete_resource_task", queue=CELERY_CACHING_QUEUE)
def delete_resource_task(resource_pk: int) -> None:
    """
    Async task helps to delete resource.
    """
    delete_resource(resource_pk)


推荐阅读