python - Python Celery - 如果队列不可用则引发
问题描述
我在 Celery 配置中定义了一条路线:
task_routes = {'tasks.add': {'queue': 'calculate'}}
这样只有特定的工作人员才能运行该任务。我开始我的工人:
celery -A myproj worker -n worker1@%h -Q calculate
然后运行我的任务:
add.apply_async((2, 2), time_limit=5)
一切顺利。但是现在,假设我的工人死了,我尝试再次运行我的任务。它挂了,永远。Time_limit 对我没有任何好处,因为任务永远不会进入它的队列。在这种情况下如何定义超时?换句话说,如果在接下来的 X 秒内没有可用的队列,我想提出一个错误。那可能吗?
解决方案
我假设您使用 Rabbitmq 作为消息代理,如果您是,那么关于 Rabbitmq(和其他类似 AMQP 的消息队列)的工作方式有一些微妙之处。首先,当您发送消息时,您的进程将其发送到交换器,交换器又将消息路由到 0 个或多个队列。您的队列可能有也可能没有消费者(即 celery 工作人员)消费消息,但作为发送方,您无法控制接收方,除非该工作人员主动回复。
但是,我认为可以通过执行以下操作来实现您想要的(假设您有后端)
- 确保使用您选择的消息 TTL声明您的队列(比如说 60 秒)。如果没有附加消费者,还要确保它没有被声明为删除。还要声明死信交换。
- 让 celery worker 监听你的死信交换,但是当它收到消息时,它会引发一个适当的异常。这里最简单的可能是收听消息,但没有加载任何任务。这样,它将导致您的后端出现 FAILURE,说明未实施的任务。
如果您的原始工作人员死亡,队列中的任何消息都将在您选择的 TTL 之后过期并被发送到您的死信交换,此时第二个工作人员(自动失败的工作人员)将收到消息并引发任务失败。请注意,您需要将 TTL 设置为远高于您期望消息在 Rabbitmq 队列中停留的时间,因为无论是否有工作人员从队列中消费,它都会过期。
要设置第一个队列,我认为您需要如下配置:
Queue(
default_queue_name,
default_exchange,
routing_key=default_routing_key,
queue_arguments={
'x-message-ttl': 60000 # milliseconds
'x-dead-letter-exchange': deadletter_exchange_name,
'x-dead-letter-routing-key': deadletter_routing_key
})
死信队列看起来更像是一个标准的 celery 工作队列配置,但您可能希望有一个单独的配置,因为您不想为此工作人员加载任何任务。
总而言之,是的,这是可能的,但并不像人们想象的那么简单。
推荐阅读
- angular - 取消订阅 Angular 中的 observable
- javascript - Google 的 reCAPTCHA v3 的工作原理
- angular - Angular 将一个组件作为另一个组件的 ng-template 传递
- sql - 在 sql server 中删除功能增量负载
- html - 在没有jquery的textarea中显示行号
- kubernetes - 使用 kubectl 命令获取数据中心
- c++ - 64位模块映射到内存时如何查找函数表?
- docker - 将调试器附加到 docker 内的应用程序
- python - 分组和添加大型数据框 python pandas
- laravel-5.4 - 使用 laravel 5.4 在 MySql 中将数据作为数组插入