python - 芹菜任务被“接收”是什么意思?当所有 celery 工人都被阻止时,未“接收”的新任务会发生什么?
问题描述
我正在开发一个新的监控系统,它可以测量 Celery 队列的吞吐量,并在队列备份时帮助提醒团队。在我的工作过程中,我遇到了一些我不理解的特殊行为(并且在 Celery 规范中没有很好地记录)。
出于测试目的,我设置了一个端点,该端点将使用 16 个可用于模拟备份队列的几个长时间运行的任务填充队列。框架是 Flask,队列代理是 Redis。Celery 为每个工作人员配置了最多并行处理 4 个任务,并且我有 2 个工作人员正在运行。
api/健康.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
工作.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
这是我模拟备份生产队列的方法:
- 我打电话
/api/debug/create-long-queue
来模拟队列中的备份。根据上面的数学计算,每个worker应该忙着睡1分钟(他们一起可以同时处理8个任务。每个任务只睡30秒,总共有16个任务。) - 不久之后(< 5 秒)我又进行了一次 API 调用,这启动了具有真实业务逻辑的不同工作(处理入站 webhook API 调用)。我们称这项工作
handle_incoming_message
。
这是我看到的使用花检查队列的内容:
- 虽然所有工作人员都被前 8 个
sleepy_job
任务阻塞,但我没有看到队列中出现新任务的迹象handle_incoming_message
,即使我确定handle_incoming_message.delay()
由于第二次 API 调用而被调用。 - 前 8 个
sleepy_job
任务完成后(约 30 秒),我handle_incoming_message
在队列中看到新的 stateRECIEVED
。 - 在完成第二个(也是最后一个)8 个
sleepy_job
任务后,我现在看到handle_incoming_message
了状态STARTED
(我可以确认这一点,因为 UI 会使用在该任务中接收和处理的新数据进行更新。)
问题
所以很明显,当工人在处理前 8 个sleepy_job
任务后暂时解除阻塞时,他们正在做一些事情来以一种可见的方式标记/确认新handle_incoming_message
任务。但这留下了几个悬而未决的问题:
handle_incoming_message
当工人被阻塞时,新任务的状态是什么?- 工人解除封锁后有哪些变化,使其如此开花现在对新
handle_incoming_message
任务有可见性? - “收到”状态实际上是什么意思?
- (奖励:如何查看在工作人员被阻止时排队的任务?)
解决方案
当所有工作人员都被阻止时,一些任务可能由于预取而处于接收状态(查看文档)。因此,您的任务很可能只是在队列中,等待 Celery 工作人员接收(协调进程 - 这些不是实际的工作进程)。
Flower 是一个简单的服务,它建立在名为“任务事件”的 Celery 特性之上。简单来说,它(Flower)将自己订阅为所有事件(接收、成功、开始、失败等)的接收者,然后将这些事件可视化地呈现给 Web 客户端。更多关于它的信息。因此,当 Celery 工作人员收到任务时,会发送“任务已接收”事件。Flower 获取此事件,并在仪表板中更改该任务的状态。
当一个任务被“接收”时,这意味着特定的 Celery 工作人员将该任务从队列中取出,它可能会立即执行(如果有空闲的工作进程来执行它),或者 Celery 工作人员将等待一个工作进程成为准备运行任务。我已经提到过预取 - Celery 工作人员通常会比可用的工作进程执行更多任务。
Celery 没有让用户列出特定队列中的内容。这就是为什么你会看到许多类似的问题——包括这个提供答案的问题。您将在那里看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是 Redis,那么您只需浏览对象列表。如果是 RabbitMQ,那么您可以使用他们的工具来检查队列。我认为不提供此信息的决定是好的,因为此信息永远不可靠。当您列出特定队列中的所有任务时,可能会有数千个新任务......
推荐阅读
- excel - 在 VBA 中向我当前的模块添加更多 if 语句
- javascript - 我该如何解决'
在android中不断停止'错误? - jms - 无法使用连接模式“客户端”和主机名连接到队列管理器“DevQueue01”
- gtk3 - 构建一个 GTK Gridview,就像一个不同高度的画廊
- office-js - Office js EmailAddressDetails undefined recipientType for Mac
- c# - 尝试对 ASN.1 模式对象执行“__set__”操作
- c++ - 我的代码在构建时没有出现任何错误,但无法正常工作
- google-workspace - 如何使用 PHP 脚本在 gsuite 上为教育创建自定义域电子邮件地址
- c++ - C++:`enable_if` 来限制支持特定算术运算的类型
- python - 使用烧瓶的聊天室