python - Celery apply_async 将 kwargs 传递给链中的所有任务
问题描述
一个 Celery 任务队列,用于计算 (2 + 2) - 3 的结果。
@app.task()
def add(**kwargs):
time.sleep(5)
x, y = kwargs['add'][0], kwargs['add'][1]
return x + y
@app.task()
def sub(**kwargs):
time.sleep(5)
x = args[0]
y = kwargs['sub'][0]
return x - y
示例任务数据 =kwargs = {'add' : (2, 2), 'sub' : (3)}
链接任务:result = (add.s() | sub.s()).apply_async(kwargs = kwargs)
根据设计,apply_async 仅将 kwargs 应用于链中的第一个任务。我需要改变什么才能达到预期的结果?
解决方案
因此,从 Celery v4.4.0rc4 开始,除了将 kwargs 传递给每个任务的签名之外,没有更好的方法可以做到这一点。虽然看起来Ask Solem (Celery dev) 对功能请求是开放的。.
链应该是这样的:
result = (add.s(job_data = job_data)| sub.s(job_data = job_data)).apply_async()
然而,由于我们的链有 10 多个任务,我不得不想出一个更简单的方法来编写这个。
# Workflow generator
def workflow_generator(task_list, job_data):
_tasks = tuple(getattr(task, 's')(job_data = job_data) for task in task_list)
return chain(*_tasks).apply_async()
taskList = [add, sub]
job_data = {'add' : (2, 2), 'sub' : (3)}
result = workflow_generator(taskList, job_data)
推荐阅读
- c++ - 如何在 Qt C++ 程序中管理多个 SQLite 数据库?
- python - 如何有效地计算大量数据的距离矩阵
- javascript - 在 React 中出现错误“找不到模块的声明文件......”
- asp.net-core - 如何在 .NET Core Razor 页面中获取登录用户列表
- javascript - 如何在数组中找到给定字符串的“字符串编号”?
- javascript - How do you retrieve selected options BEFORE the user have submitted the form?
- python - Python list.pop 特定元素,例如 List.pop(0)(0)
- axios - 如何在 axios graphql 中为 vue3.js 分页?
- gatsby - 如何引用从 Airtable 导入的查询?
- ios - 如何给一个真正的 iPhone 空语言环境