首页 > 解决方案 > 如何将所有芹菜链任务发送到其他队列然后芹菜队列?

问题描述

我正在尝试画布中提到的链接。下面是我的项目结构。

proj/
├── __init__.py
├── celery.py
├── celeryconfig.py
├── module1
│   ├── __init__.py
│   └── tasks.py
└── module2
    └── tasks.py

文件:芹菜.py

"""Main entry module for celery tasks."""
from __future__ import absolute_import, unicode_literals

from celery import Celery
from . import celeryconfig

app = Celery('mycelery')
app.config_from_object(celeryconfig)

文件:celeryconfig.py

"""Celery Configuration."""
import os

broker_url = os.getenv("CELERY_BROKER_URL")
result_backend = os.getenv("CELERY_BACKEND_URL")
worker_max_tasks_per_child = 4

include = [
    "proj.module1.tasks",
    "proj.module2.tasks"]

文件:module1/tasks.py

from __future__ import absolute_import, unicode_literals

from celery import group
from celery import subtask

from proj.celery import app


@app.task
def my_workflow(x, y):
    (mul.s(x, y) | add.s(5)).apply_async(queue="myq")
#   (get_array.s(x, y) | dmap.s(add.s())).apply_async(queue="myq")


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y

@app.task
def get_array(x, y):
    return [(x, x)] * y


@app.task
def dmap(listResp, processFunc):
    callback = subtask(processFunc)
    return group(callback.clone([arg,]) for arg in listResp)()

我安装了虚拟环境来运行我的芹菜任务。它4.4.2安装了芹菜。

$ virtualenv/bin/pip freeze
amqp==2.5.2
appnope==0.1.0
backcall==0.2.0
billiard==3.6.3.0
celery==4.4.2
decorator==4.4.2
importlib-metadata==1.6.0
ipython==7.16.1
ipython-genutils==0.2.0
jedi==0.17.1
kombu==4.6.8
parso==0.7.0
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.5
ptyprocess==0.6.0
Pygments==2.6.1
pytz==2020.1
redis==3.5.3
six==1.15.0
traitlets==4.3.3
vine==1.3.0
wcwidth==0.2.5
zipp==3.1.0

我尝试在不监听特定队列的情况下运行。

$ virtualenv/bin/celery worker -A proj -l INFO

celery@MQM1CPG8WL v4.4.2 (cliffs)

Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:38:04

[config]
.> app:         mycelery:0x1115be410
.> transport:   amqp://dev:**@127.0.0.1:5672/my-dev
.> results:     redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . proj.module1.tasks.add
  . proj.module1.tasks.dmap
  . proj.module1.tasks.get_array
  . proj.module1.tasks.mul
  . proj.module1.tasks.my_workflow


[2020-07-11 00:38:04,300: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:38:04,317: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:38:05,369: INFO/MainProcess] mingle: all alone
[2020-07-11 00:38:05,412: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:38:07,240: INFO/MainProcess] Events of group {task} enabled by remote.

然后我尝试使用链。

$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec  9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from proj.module1.tasks import add, mul

In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async()

In [3]: res.get()
Out[3]: 640

芹菜工人有日志。

[2020-07-11 00:41:40,051: INFO/MainProcess] Received task: proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444]
[2020-07-11 00:41:40,093: INFO/MainProcess] Received task: proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417]
[2020-07-11 00:41:40,094: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444] succeeded in 0.04052638599998204s: 8
[2020-07-11 00:41:40,129: INFO/MainProcess] Received task: proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb]
[2020-07-11 00:41:40,131: INFO/ForkPoolWorker-1] Task proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417] succeeded in 0.03607683999999267s: 64
[2020-07-11 00:41:40,132: INFO/ForkPoolWorker-8] Task proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb] succeeded in 0.001778557999983832s: 640

现在celery,我希望我的工作人员不听队列,而是听myq. 我开始了我的工人,myq如下所示。

$ virtualenv/bin/celery worker -A proj -l INFO -Q myq

celery@MQM1CPG8WL v4.4.2 (cliffs)

Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:43:45

[config]
.> app:         mycelery:0x10dce2490
.> transport:   amqp://dev:**@127.0.0.1:5672/my-dev
.> results:     redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> myq              exchange=myq(direct) key=myq


[tasks]
  . proj.module1.tasks.add
  . proj.module1.tasks.dmap
  . proj.module1.tasks.get_array
  . proj.module1.tasks.mul
  . proj.module1.tasks.my_workflow


[2020-07-11 00:43:45,587: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:43:45,601: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:43:46,658: INFO/MainProcess] mingle: all alone
[2020-07-11 00:43:46,701: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:43:47,261: INFO/MainProcess] Events of group {task} enabled by remote.

现在我正在尝试与myq队列相同的链。

$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec  9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from proj.module1.tasks import add, mul

In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async(queue="myq")

In [3]: res.get()

res.get在这个阶段停留了一段时间。当我检查worker log它时,它只有一个任务条目。

[2020-07-11 00:45:45,700: INFO/MainProcess] Received task: proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce]
[2020-07-11 00:45:45,747: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce] succeeded in 0.044210322999987284s: 8

这表明,只有第一个任务到达myq队列。

当我检查队列中的消息时,celery队列有1消息。

[[8, 8], {}, {"callbacks": null, "errbacks": null, "chain": [{"task": "proj.module1.tasks.mul", "args": [10], "kwargs": {}, "options": {"task_id": "efd420e4-845c-4198-8f97-e8ae0015f9e9", "reply_to": "26aee9c3-21e3-3ce4-a1b1-a3bef6e7e5b0"}, "subtask_type": null, "chord_size": null, "immutable": false}], "chord": null}]

不知何故,链中的下一条消息重定向到celery队列,而不是myq.

如何告诉链将所有任务发送到myq

标签: pythoncelerychainingcelery-canvas

解决方案


推荐阅读