首页 > 解决方案 > Redis Queue:如何防止链式作业异步运行

问题描述

我试图让 3 个作业按顺序依次运行:

Job1 -> Job2 -> Job3

这 3 个工作定义在operations.py

def Job1(x):
    return x

def Job2(x):
    return x * x

def Job3(x):
    print(x)

我正在运行这些script.py工作rq worker

from redis import Redis
from rq import Queue
from operation import Job1, Job2, Job3

redis_conn = Redis()
q = Queue(connection=redis_conn)

for num in [1,2,3,4,5,6,7,8]:
    j1 = q.enqueue(Job1, num)
    j2 = q.enqueue(Job2, j1.result, depends_on = j1)
    j3 = q.enqueue(Job3, depends_on = j2)

根据文档,我希望 j3 等待 j2 依次等待 j1 完成执行。但是,这可能不会发生。这些作业似乎正在异步运行。我这样说是因为 redis 工作人员将此作为错误:

File "./operation.py", line 5, in Job2
    return x * x
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'

j2 不是等待 j1 的结果,而是异步也启动了,因为那时 j1 的结果还没有准备好,j1.result 是 None ,它被传递给 j2。我的方法有什么问题?为什么作业不按顺序运行?

标签: pythonpython-3.xredisqueuepython-rq

解决方案


当您将作业添加到队列中时,j1.resultis None:只有在工作人员完成执行后,它才会具有不同的值。

我认为你需要传递j1.idJob2,并做类似的事情

def Job2(job1_id):
  from rq.job import Job
  job1 = Job.fetch(job1_id)
  x = job1.result
  return x * x

更多信息请访问http://python-rq.org/docs/jobs/#retrieving-a-job-from-redis

编辑: 这样做更干净,无需传递 id https://stackoverflow.com/a/37713756/239408


推荐阅读