首页 > 解决方案 > Python Celery - 如果队列不可用则引发

问题描述

我在 Celery 配置中定义了一条路线: task_routes = {'tasks.add': {'queue': 'calculate'}}

这样只有特定的工作人员才能运行该任务。我开始我的工人: celery -A myproj worker -n worker1@%h -Q calculate

然后运行我的任务: add.apply_async((2, 2), time_limit=5)

一切顺利。但是现在,假设我的工人死了,我尝试再次运行我的任务。它挂了,永远。Time_limit 对我没有任何好处,因为任务永远不会进入它的队列。在这种情况下如何定义超时?换句话说,如果在接下来的 X 秒内没有可用的队列,我想提出一个错误。那可能吗?

标签: pythonqueuetimeoutcelery

解决方案


我假设您使用 Rabbitmq 作为消息代理,如果您是,那么关于 Rabbitmq(和其他类似 AMQP 的消息队列)的工作方式有一些微妙之处。首先,当您发送消息时,您的进程将其发送到交换器,交换器又将消息路由到 0 个或多个队列。您的队列可能有也可能没有消费者(即 celery 工作人员)消费消息,但作为发送方,您无法控制接收方,除非该工作人员主动回复。

但是,我认为可以通过执行以下操作来实现您想要的(假设您有后端)

  1. 确保使用您选择的消息 TTL声明您的队列(比如说 60 秒)。如果没有附加消费者,还要确保它没有被声明为删除。还要声明死信交换。
  2. 让 celery worker 监听你的死信交换,但是当它收到消息时,它会引发一个适当的异常。这里最简单的可能是收听消息,但没有加载任何任务。这样,它将导致您的后端出现 FAILURE,说明未实施的任务。

如果您的原始工作人员死亡,队列中的任何消息都将在您选择的 TTL 之后过期并被发送到您的死信交换,此时第二个工作人员(自动失败的工作人员)将收到消息并引发任务失败。请注意,您需要将 TTL 设置为远高于您期望消息在 Rabbitmq 队列中停留的时间,因为无论是否有工作人员从队列中消费,它都会过期。

要设置第一个队列,我认为您需要如下配置:

Queue(
default_queue_name,
default_exchange,
routing_key=default_routing_key,
queue_arguments={
    'x-message-ttl': 60000 # milliseconds
    'x-dead-letter-exchange': deadletter_exchange_name,
    'x-dead-letter-routing-key': deadletter_routing_key
})

死信队列看起来更像是一个标准的 celery 工作队列配置,但您可能希望有一个单独的配置,因为您不想为此工作人员加载任何任务。

总而言之,是的,这是可能的,但并不像人们想象的那么简单。


推荐阅读