python - Celery task.status 方法引发异常:AttributeError:模块没有属性“DoesNotExist”
问题描述
堆:
- django =“==2.2.2”
- django-celery-beat = "==1.4.0"
- 芹菜=“==v4.3.0rc1”
- python_version = "3.7"
我有一个从 csv/xls 文件导入一些数据并保存数据的类,这是我的 celery 配置:
CELERY_TASK_ALWAYS_EAGER = False
CELERY_BROKER_URL = config('REDIS_BROKER_URL')
CELERY_RESULT_BACKEND = config('REDIS_RESULT_URL')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
这是我调用我的任务的地方(一些基于类的视图):
def form_valid(self, form):
if self.request.is_ajax():
form.save()
instance = form.save()
kwargs = {
'corporation_id': self.corporation.id,
'file_id': instance.id,
}
task_id = import_file_task.apply_async(
kwargs=kwargs,
)
instance.tas_id = task_id
instance.save()
return JsonResponse(
{
'form_status': 'Success',
'task_id': str(task_id),
}
)
return super().form_valid(form)
这是我调用我的任务的地方:
@celery_app.task(bind=True)
def import_file_task(_, corporation_id, file_id):
sale_file = SaleFile.objects.get(
id=file_id,
corporation_id=corporation_id,
)
if sale_file.type == PRODUCT_FILE:
error = ProductImporter(
corporation_id=corporation_id,
file_id=file_id,
product_file=sale_file,
).save()
elif sale_file.type == RECEIVABLE_FILE:
error = ReceivableImporter(
corporation_id=corporation_id,
file_id=file_id,
receivable_file=sale_file,
).save()
else:
raise ValueError('File type is not valid')
task = AsyncResult(sale_file.tas_id)
task.info = error
task.status = 'COMPLETED'
这是我尝试轮询任务状态的地方,我得到了错误!
class TaskStatus(View):
def get(self, request):
task_id = request.GET.get('_task_id')
task = AsyncResult(task_id)
print(task)
print(task.state) #HERE IS THE ERROR
print(dir(task)) #THE STATUS APPEAR HERE
success_response = (
{
'status': ['state: '
],
'result': {
'success': True,
}
}
)
return (
JsonResponse(success_response)
)
这是我第一次使用芹菜,所以欢迎任何帮助。
解决方案
所以,我试图在我的任务中调用一个模型实例,但该实例尚未创建。我的解决方案是创建一个函数来使用 transiction.on_commit 调用我的任务,代码如下:
def post(self, request, *args, **kwargs):
form = self.get_form()
if form.is_valid():
instance = form.save()
kwargs = {
'file_id': instance.id,
'file_type': instance.type,
}
task = import_file_task
result = task.freeze()
def run_task():
task.signature(kwargs=kwargs).apply_async()
transaction.on_commit(run_task)
return JsonResponse(
{
'form_status': 'Success',
'task_id': result.task_id,
'file_detail_url': self._get_detail_url(instance.id)
}
)
else:
return JsonResponse(
{
'form_status': 'Error',
'error': json.loads(form.errors.as_json())
}
)
推荐阅读
- java - 安装 TestNG 后 Eclipse Kepler 中的工具提示错误
- javascript - 隐藏引导工具提示而不是处置
- android - 在布局文件中添加新视图后,它们的 id 没有显示在 DataBinding 变量中,它说无法解析符号
- android - 尽管 -keepclasseswithmembernames Proguard 混淆了类
- angular - 如何在angular2+中显示隐藏(星**)代替密码
- powershell - PowerShell 返回 0 作为字符串长度
- ios - 从构建服务器构建时,Firebase Analytics 不记录任何事件
- ios - 使用 UISearchController 在第一个表上方的差距
- java - 如何从 json 对象中获取值?
- c# - c# - RegisterConditional Ninject 等效项