python - Redis Queue:如何防止链式作业异步运行
问题描述
我试图让 3 个作业按顺序依次运行:
Job1 -> Job2 -> Job3
这 3 个工作定义在operations.py
:
def Job1(x):
return x
def Job2(x):
return x * x
def Job3(x):
print(x)
我正在运行这些script.py
工作rq worker
:
from redis import Redis
from rq import Queue
from operation import Job1, Job2, Job3
redis_conn = Redis()
q = Queue(connection=redis_conn)
for num in [1,2,3,4,5,6,7,8]:
j1 = q.enqueue(Job1, num)
j2 = q.enqueue(Job2, j1.result, depends_on = j1)
j3 = q.enqueue(Job3, depends_on = j2)
根据文档,我希望 j3 等待 j2 依次等待 j1 完成执行。但是,这可能不会发生。这些作业似乎正在异步运行。我这样说是因为 redis 工作人员将此作为错误:
File "./operation.py", line 5, in Job2
return x * x
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'
j2 不是等待 j1 的结果,而是异步也启动了,因为那时 j1 的结果还没有准备好,j1.result 是 None ,它被传递给 j2。我的方法有什么问题?为什么作业不按顺序运行?
解决方案
当您将作业添加到队列中时,j1.result
is None
:只有在工作人员完成执行后,它才会具有不同的值。
我认为你需要传递j1.id
给Job2
,并做类似的事情
def Job2(job1_id):
from rq.job import Job
job1 = Job.fetch(job1_id)
x = job1.result
return x * x
更多信息请访问http://python-rq.org/docs/jobs/#retrieving-a-job-from-redis
编辑: 这样做更干净,无需传递 id https://stackoverflow.com/a/37713756/239408
推荐阅读
- java - 访问资源文件夹中的文件
- sql - 如何确定 Presto 中一个月的天数?
- laravel - Laravel:检索关系数据
- angular - Angular 7从promise返回字符串
- r - 如何用文本和点注释 R 地图
- sdk - Sap Cloud SDK - 使用自定义字段创建 BP
- python - 如何使用 python 为在 Direct Runner 上运行的 apache 光束管道设置日志/打印
- taleo - 如何使用 Taleo Connect 客户端基于 csv 文件提取数据?
- javascript - 如何使用带有 jQuery 的 nextUntil 包含选择的第一个元素
- python-3.x - 使用opencv读取.dv文件,找不到dv头错误