首页 > 解决方案 > Celery task.status 方法引发异常:AttributeError:模块没有属性“DoesNotExist”

问题描述

堆:

我有一个从 csv/xls 文件导入一些数据并保存数据的类,这是我的 celery 配置:

CELERY_TASK_ALWAYS_EAGER = False
CELERY_BROKER_URL = config('REDIS_BROKER_URL')
CELERY_RESULT_BACKEND = config('REDIS_RESULT_URL')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

这是我调用我的任务的地方(一些基于类的视图):

    def form_valid(self, form):
        if self.request.is_ajax():
            form.save()
            instance = form.save()
            kwargs = {
                    'corporation_id': self.corporation.id,
                    'file_id': instance.id,
                }

            task_id = import_file_task.apply_async(
                kwargs=kwargs,
            )
            instance.tas_id = task_id
            instance.save()
            return JsonResponse(
                {
                    'form_status': 'Success',
                    'task_id': str(task_id),
                }
            )

        return super().form_valid(form)

这是我调用我的任务的地方:

@celery_app.task(bind=True)
def import_file_task(_, corporation_id, file_id):
    sale_file = SaleFile.objects.get(
        id=file_id,
        corporation_id=corporation_id,
    )
    if sale_file.type == PRODUCT_FILE:
        error = ProductImporter(
                    corporation_id=corporation_id,
                    file_id=file_id,
                    product_file=sale_file,
                ).save()
    elif sale_file.type == RECEIVABLE_FILE:
        error = ReceivableImporter(
                    corporation_id=corporation_id,
                    file_id=file_id,
                    receivable_file=sale_file,
                ).save()
    else:
        raise ValueError('File type is not valid')
    task = AsyncResult(sale_file.tas_id)
    task.info = error
    task.status = 'COMPLETED'

这是我尝试轮询任务状态的地方,我得到了错误

class TaskStatus(View):
    def get(self, request):
        task_id = request.GET.get('_task_id')
        task = AsyncResult(task_id)
        print(task)
        print(task.state) #HERE IS THE ERROR
        print(dir(task)) #THE STATUS APPEAR HERE
        success_response = (
            {
                'status': ['state: '
                           ],
                'result': {
                    'success': True,
                }
            }
        )
        return (
            JsonResponse(success_response)
        )

这是我第一次使用芹菜,所以欢迎任何帮助。

标签: pythondjangocelerycelery-task

解决方案


所以,我试图在我的任务中调用一个模型实例,但该实例尚未创建。我的解决方案是创建一个函数来使用 transiction.on_commit 调用我的任务,代码如下:

    def post(self, request, *args, **kwargs):
        form = self.get_form()
        if form.is_valid():
            instance = form.save()
            kwargs = {
                    'file_id': instance.id,
                    'file_type': instance.type,
                }

            task = import_file_task
            result = task.freeze()

            def run_task():
                task.signature(kwargs=kwargs).apply_async()
            transaction.on_commit(run_task)
            return JsonResponse(
                {
                    'form_status': 'Success',
                    'task_id': result.task_id,
                    'file_detail_url': self._get_detail_url(instance.id)
                }
            )

        else:
            return JsonResponse(
                {
                    'form_status': 'Error',
                    'error': json.loads(form.errors.as_json())
                }
            )

推荐阅读