python - Python Celery 任务优先级未按预期工作
问题描述
您好我正在尝试使用 Python 优先发送任务以ThreadPoolExecutor
同时发送 8 个任务。我如何达到预期?
我使用以下命令启动了 worker。
celery -A task worker --loglevel=info -P eventlet --concurrency=5
观察:从下面的日志截图中可以看出,任务以随机顺序运行,而不是遵循优先级。
期望:任务按优先级顺序运行。
任务.py
from __future__ import absolute_import
from celery import Celery
import sys
import time
from kombu import Queue, Exchange
# celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")
celery_app = Celery("priority_queue")
celeryconfig = {}
celeryconfig['BROKER_URL'] = 'pyamqp://guest@localhost//'
celeryconfig['CELERY_RESULT_BACKEND'] = 'rpc://'
celeryconfig['CELERY_QUEUES'] = (
Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
)
celeryconfig['CELERY_ACKS_LATE'] = True
celeryconfig['CELERYD_PREFETCH_MULTIPLIER'] = 1
celery_app.config_from_object(celeryconfig)
@celery_app.task(queue='tasks', ignore_result=True)
def add(x, y, priority):
print("Adding {} and {} with priority {}".format(x, y, priority))
time.sleep(5)
print("Result {} and {} with priority {}".format(x, y, priority))
return x + y
客户端.py
from __future__ import absolute_import
from concurrent.futures.thread import ThreadPoolExecutor
from testapp import add
def add_with_priority(x, y, priority):
res = add.apply_async(args=[x, y, priority], routing_key='tasks', priority=priority)
if res.ready():
print("Result from {} + {} with priority {} is {}".format(x, y, priority, res))
if __name__ == '__main__':
test_samples = [[4, 5, 1], [2, 9, 2], [10, 25, 3], [4, 5, 4], [4, 5, 5], [4, 5, 6], [4, 5, 7], [4, 5, 8]]
with ThreadPoolExecutor(max_workers=4) as e:
for test in test_samples:
e.submit(add_with_priority, *test)
工作日志
解决方案
我查看了我的代码(当时有效):
def build_task(task_funct, queue, priority, args):
return task_funct.signature(args=args, queue=queue, priority=priority, immutable=True, kwargs={'priority': priority}
与您的代码相比-也许您可以尝试更改此行:
res = add.apply_async(args=[x, y, priority], routing_key='tasks', priority=priority)
至:
res = add.apply_async(args=[x, y, priority], queue='tasks', routing_key='tasks', priority=priority, kwargs={'priority': priority})
请让我知道它是否有任何不同
编辑:
你可以这样尝试吗:
task = build_task(add, args=(x, y, priority), queue='tasks', routing_key='tasks', priority=priority)
(task).apply_async()
(使用我build_task
上面的)
推荐阅读
- docker - Docker - CMD npm start 先于复制所有
- jmeter - 用于测试仪表板向下钻取报告的 Locust 或 Jmeter
- mysql - 无法将表名传递给 MySQL 函数
- oauth - 从共享点设计器工作流调用 https://login.microsoftonline.com/{tanent ID}/oauth2/token
- javascript - 事件处理程序中的 React SetState 与模拟数据和来自 API 的数据的行为不同
- node.js - 将 MS 转换为 DD:HH:MM:SS
- rust - 实现返回 HashMap::IntoIter 的 IntoIterator 时出现“类型参数数量错误”
- python - 带有输入和输出的 Python 自定义构建
- java - 在套接字 java 上读取和写入对象
- mysql - laravel morphToMany 的父模型如何在子查询中使用?