python - 如何将所有芹菜链任务发送到其他队列然后芹菜队列?
问题描述
我正在尝试画布中提到的链接。下面是我的项目结构。
proj/
├── __init__.py
├── celery.py
├── celeryconfig.py
├── module1
│ ├── __init__.py
│ └── tasks.py
└── module2
└── tasks.py
文件:芹菜.py
"""Main entry module for celery tasks."""
from __future__ import absolute_import, unicode_literals
from celery import Celery
from . import celeryconfig
app = Celery('mycelery')
app.config_from_object(celeryconfig)
文件:celeryconfig.py
"""Celery Configuration."""
import os
broker_url = os.getenv("CELERY_BROKER_URL")
result_backend = os.getenv("CELERY_BACKEND_URL")
worker_max_tasks_per_child = 4
include = [
"proj.module1.tasks",
"proj.module2.tasks"]
文件:module1/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import group
from celery import subtask
from proj.celery import app
@app.task
def my_workflow(x, y):
(mul.s(x, y) | add.s(5)).apply_async(queue="myq")
# (get_array.s(x, y) | dmap.s(add.s())).apply_async(queue="myq")
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def get_array(x, y):
return [(x, x)] * y
@app.task
def dmap(listResp, processFunc):
callback = subtask(processFunc)
return group(callback.clone([arg,]) for arg in listResp)()
我安装了虚拟环境来运行我的芹菜任务。它4.4.2
安装了芹菜。
$ virtualenv/bin/pip freeze
amqp==2.5.2
appnope==0.1.0
backcall==0.2.0
billiard==3.6.3.0
celery==4.4.2
decorator==4.4.2
importlib-metadata==1.6.0
ipython==7.16.1
ipython-genutils==0.2.0
jedi==0.17.1
kombu==4.6.8
parso==0.7.0
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.5
ptyprocess==0.6.0
Pygments==2.6.1
pytz==2020.1
redis==3.5.3
six==1.15.0
traitlets==4.3.3
vine==1.3.0
wcwidth==0.2.5
zipp==3.1.0
我尝试在不监听特定队列的情况下运行。
$ virtualenv/bin/celery worker -A proj -l INFO
celery@MQM1CPG8WL v4.4.2 (cliffs)
Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:38:04
[config]
.> app: mycelery:0x1115be410
.> transport: amqp://dev:**@127.0.0.1:5672/my-dev
.> results: redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. proj.module1.tasks.add
. proj.module1.tasks.dmap
. proj.module1.tasks.get_array
. proj.module1.tasks.mul
. proj.module1.tasks.my_workflow
[2020-07-11 00:38:04,300: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:38:04,317: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:38:05,369: INFO/MainProcess] mingle: all alone
[2020-07-11 00:38:05,412: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:38:07,240: INFO/MainProcess] Events of group {task} enabled by remote.
然后我尝试使用链。
$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec 9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from proj.module1.tasks import add, mul
In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async()
In [3]: res.get()
Out[3]: 640
芹菜工人有日志。
[2020-07-11 00:41:40,051: INFO/MainProcess] Received task: proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444]
[2020-07-11 00:41:40,093: INFO/MainProcess] Received task: proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417]
[2020-07-11 00:41:40,094: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[dcedd03b-405d-4490-a1dd-5f697cdf9444] succeeded in 0.04052638599998204s: 8
[2020-07-11 00:41:40,129: INFO/MainProcess] Received task: proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb]
[2020-07-11 00:41:40,131: INFO/ForkPoolWorker-1] Task proj.module1.tasks.mul[8019e971-8347-4a1d-b7db-9d62fb147417] succeeded in 0.03607683999999267s: 64
[2020-07-11 00:41:40,132: INFO/ForkPoolWorker-8] Task proj.module1.tasks.mul[6a5f129b-07c5-43df-a55c-525a67fe9feb] succeeded in 0.001778557999983832s: 640
现在celery
,我希望我的工作人员不听队列,而是听myq
. 我开始了我的工人,myq
如下所示。
$ virtualenv/bin/celery worker -A proj -l INFO -Q myq
celery@MQM1CPG8WL v4.4.2 (cliffs)
Darwin-18.7.0-x86_64-i386-64bit 2020-07-11 00:43:45
[config]
.> app: mycelery:0x10dce2490
.> transport: amqp://dev:**@127.0.0.1:5672/my-dev
.> results: redis://127.0.0.1/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> myq exchange=myq(direct) key=myq
[tasks]
. proj.module1.tasks.add
. proj.module1.tasks.dmap
. proj.module1.tasks.get_array
. proj.module1.tasks.mul
. proj.module1.tasks.my_workflow
[2020-07-11 00:43:45,587: INFO/MainProcess] Connected to amqp://dev:**@127.0.0.1:5672/my-dev
[2020-07-11 00:43:45,601: INFO/MainProcess] mingle: searching for neighbors
[2020-07-11 00:43:46,658: INFO/MainProcess] mingle: all alone
[2020-07-11 00:43:46,701: INFO/MainProcess] celery@MQM1CPG8WL ready.
[2020-07-11 00:43:47,261: INFO/MainProcess] Events of group {task} enabled by remote.
现在我正在尝试与myq
队列相同的链。
$ virtualenv/bin/ipython
Python 3.7.4 (default, Dec 9 2019, 10:50:13)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from proj.module1.tasks import add, mul
In [2]: res = (add.s(4, 4) | mul.s(8) | mul.s(10)).apply_async(queue="myq")
In [3]: res.get()
这res.get
在这个阶段停留了一段时间。当我检查worker log
它时,它只有一个任务条目。
[2020-07-11 00:45:45,700: INFO/MainProcess] Received task: proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce]
[2020-07-11 00:45:45,747: INFO/ForkPoolWorker-8] Task proj.module1.tasks.add[d58fba45-429b-4c3c-8598-3a24916e6fce] succeeded in 0.044210322999987284s: 8
这表明,只有第一个任务到达myq
队列。
当我检查队列中的消息时,celery
队列有1
消息。
[[8, 8], {}, {"callbacks": null, "errbacks": null, "chain": [{"task": "proj.module1.tasks.mul", "args": [10], "kwargs": {}, "options": {"task_id": "efd420e4-845c-4198-8f97-e8ae0015f9e9", "reply_to": "26aee9c3-21e3-3ce4-a1b1-a3bef6e7e5b0"}, "subtask_type": null, "chord_size": null, "immutable": false}], "chord": null}]
不知何故,链中的下一条消息重定向到celery
队列,而不是myq
.
如何告诉链将所有任务发送到myq
?
解决方案
推荐阅读
- azure - 为什么 Azure 搜索不能导入 JSON Blob?
- python - Seaborn/Matplotlib - 用置信区间刻面线图
- javascript - ko.observable() 修剪结束空格
- shell - 根据部分行区分两个文件
- python - 使用可选字段创建字典的 Pythonic 方法
- r - Linux服务器上的R memoise
- c# - StructureMap 2.5.3 in .Net 4.5.2 延迟加载问题
- scala - 是否可以测试一个值的类是否属于内部类类型
- javascript - Electron 不会从 JSON 打开文件
- sql - SQL 多个 where 子句