django-rest-framework - 通过 Celery 任务调用 DRF ViewSet
问题描述
我有一个 Django Rest 框架视图集:
MyModelViewSet(generics.RetrieveUpdateDestroyAPIView):
def perform_destroy(self, instance):
# do something besides deleting the object
现在我正在编写一个 Celery 定期任务,它根据过滤器(比如说end_date < now
)删除过期的对象。
我希望任务重用并执行在 ViewSet 的perform_destroy
方法中执行的相同操作。
这可以做到吗?如何?
谢谢!
解决方案
您可以通过在 DRF 中使用Request来解决您的问题,安排一个 celery 任务来调用该请求。效果很好,我之前已经实现过。
示例代码:
from rest_framework.request import Request as DRFRequest
from django.conf import settings
from django.http import HttpRequest
from your_module.views import MyModelViewSet
CELERY_CACHING_QUEUE = getattr(settings, "CELERY_CACHING_QUEUE", None)
def delete_resource(resource_pk: int) -> None:
"""
This method helps to delete the resource by the id.
"""
print(f'Starting deleting resource {resource_pk}...')
request = HttpRequest()
request.method = 'DELETE'
request.META = {
'SERVER_NAME': settings.ALLOWED_HOSTS[0],
'SERVER_PORT': 443
}
drf_request = DRFRequest(request)
# If your API need user has access permission,
# you should handle for getting the value of
# user_has_access_permission before
# E.g. below
# drf_request.user = user_has_access_permission
try:
view = MyModelViewSet(
kwargs={
'pk': resource_pk
},
request=drf_request
)
view.initial(drf_request)
view.delete(drf_request)
except (Exception, KeyError) as e:
print(f'Cannot delete resource: {resource_pk}, error: {e}')
return
print(f'Finished deleting resource {resource_pk}...')
@task(name="delete_resource_task", queue=CELERY_CACHING_QUEUE)
def delete_resource_task(resource_pk: int) -> None:
"""
Async task helps to delete resource.
"""
delete_resource(resource_pk)
推荐阅读
- signal-processing - 连续 EEG 信号的基线校正
- xamarin.forms - 如何在 viewModel 中将 CheckBox 的值设置为 true?
- java - 使用jsch自动向下滚动shell
- azure - 无法通过 https 访问托管在 VM 上的服务器
- lua - Lua 让球像真正的球一样旋转 roblox
- axapta - 创建自定义 projinvoice 后找不到创建的设计
- javascript - 让 axios 等待重定向
- javascript - 树结构的打字稿接口
- php - 将 nextjs 应用程序移植到 php 站点
- google-apps-script - Google Apps 脚本中跨列的子字符串比较