首页 > 解决方案 > Python Celery 任务优先级未按预期工作

问题描述

您好我正在尝试使用 Python 优先发送任务以ThreadPoolExecutor同时发送 8 个任务。我如何达到预期

我使用以下命令启动了 worker。

celery -A task worker --loglevel=info -P eventlet --concurrency=5

观察:从下面的日志截图中可以看出,任务以随机顺序运行,而不是遵循优先级。

期望:任务按优先级顺序运行。

任务.py

from __future__ import absolute_import
from celery import Celery
import sys
import time
from kombu import Queue, Exchange

# celery_url = "amqp://%s:%s@%s//" % ("guest", "guest", "0.0.0.0")
celery_app = Celery("priority_queue")

celeryconfig = {}
celeryconfig['BROKER_URL'] = 'pyamqp://guest@localhost//'
celeryconfig['CELERY_RESULT_BACKEND'] = 'rpc://'
celeryconfig['CELERY_QUEUES'] = (
    Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}),
)
celeryconfig['CELERY_ACKS_LATE'] = True
celeryconfig['CELERYD_PREFETCH_MULTIPLIER'] = 1

celery_app.config_from_object(celeryconfig)


@celery_app.task(queue='tasks', ignore_result=True)
def add(x, y, priority):
    print("Adding {} and {} with priority {}".format(x, y, priority))
    time.sleep(5)
    print("Result {} and {} with priority {}".format(x, y, priority))
    return x + y

客户端.py

from __future__ import absolute_import

from concurrent.futures.thread import ThreadPoolExecutor
from testapp import add


def add_with_priority(x, y, priority):
    res = add.apply_async(args=[x, y, priority], routing_key='tasks', priority=priority)
    if res.ready():
        print("Result from {} + {} with priority {} is {}".format(x, y, priority, res))


if __name__ == '__main__':
    test_samples = [[4, 5, 1], [2, 9, 2], [10, 25, 3], [4, 5, 4], [4, 5, 5], [4, 5, 6], [4, 5, 7], [4, 5, 8]]

    with ThreadPoolExecutor(max_workers=4) as e:
        for test in test_samples:
            e.submit(add_with_priority, *test)

工作日志

在此处输入图像描述

标签: pythoncelery

解决方案


我查看了我的代码(当时有效):

def build_task(task_funct, queue, priority, args):
    return task_funct.signature(args=args, queue=queue, priority=priority, immutable=True, kwargs={'priority': priority}

与您的代码相比-也许您可以尝试更改此行:

res = add.apply_async(args=[x, y, priority], routing_key='tasks', priority=priority)

至:

res = add.apply_async(args=[x, y, priority], queue='tasks', routing_key='tasks', priority=priority, kwargs={'priority': priority})

请让我知道它是否有任何不同

编辑:

你可以这样尝试吗:

task = build_task(add, args=(x, y, priority), queue='tasks', routing_key='tasks', priority=priority)
(task).apply_async()

(使用我build_task上面的)


推荐阅读