python - 芹菜——结果后端不返回组结果
问题描述
更新:2020 年 7 月 25 日星期六
总体问题:我有一个包含 a 的链,celery.group
并且该链没有给我我认为的结果。在你告诉我results.save()
再恢复之前。我已经尝试过了,但我没有运气。下面是我正在执行的代码中的 celery worker 以获得所需的结果。
我有一个正在运行的芹菜工人,看起来像这样:
[2020-07-23 13:41:04,278: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-07-23 13:41:04,284: DEBUG/MainProcess] | Worker: Building graph...
[2020-07-23 13:41:04,285: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Building graph...
[2020-07-23 13:41:04,402: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Gossip, Heart, Agent, Tasks, Control, event loop}
-------------- celery@worker1.deon-aarininc v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-42-generic-x86_64-with-glibc2.29 2020-07-23 13:41:04
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7fe4b1e98580
- ** ---------- .> transport: redis://localhost:6379/1
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
下面的函数是我为创建并行进程链而调用的函数。API 调用 --> 转换 DICT --> 并行过程 --> 结果。结果显示了工作人员在终端中运行的位置,但我需要他们去解释器。
@app.task
def setup_chunk(nested_dict):
"""
Setup nested data structure for parallel run
- equivalant to an O(n) for loop call
Args:
nested_dict - nested dictionary
Returns:
list of jobs
"""
return [nested_dict[i::len(nested_dict)] for i in range(len(nested_dict))]
@app.task
def process_step(board, header=HEADER):
"""
Process step in run
Args:
nested_dict - nested dictionary
Returns:
results of running Celery in parallel
"""
api_bid_url = ""
return api_request_get(api_bid_url, HEADER)
@app.task
def process_group(chunk):
return group([process_step.s(chunk[i]) for i in range(len(chunk))]).apply_async()
$ ipython3
[LN x]: pipeline = (setup_chunk.s(board_data['data']) | \
...: process_group.s()).apply_async()
这就是ipython3
解释器的结果:所有工作任务 ID。
Out[42]:
[['0abee691-ace4-47b8-83cf-b29502c5499d', None],
[[['08873986-5193-4035-8442-1c46039a93f1', None], None],
[['12adf051-1a78-4b9a-ba49-8b0ad09aa157', None], None],
[['043811f5-aba1-4942-bf9d-f90cac0623cd', None], None],
[['d46f1f61-c753-49cd-a9d4-bf7a3e69a037', None], None],
[['b62ee40f-a0d3-4410-b36c-5c3f7967f61a', None], None],
[['334b6272-3667-4dd3-9eab-c3c05baad760', None], None],
[['5a75d948-6941-4598-8b97-c60b8960bbd0', None], None],
[['18a93b78-435e-4489-b25c-bb699addd160', None], None],
[['3c0f9834-eb1c-417c-b71e-03872ba19e90', None], None],
[['a7d0ff28-d139-4f02-8975-8575613aef7c', None], None],
[['94edd270-e612-4b68-932e-bb16c2a8af35', None], None],
[['13c95d8c-c65b-407e-8f34-3dd9fa171591', None], None],
[['fb689945-1fed-49e4-b430-48344a83fcf4', None], None]]]
我想要的是从工人的终端上出来。
解决方案
推荐阅读
- android - 有没有办法使用 sqlite 语句启用 fts5
- ruby-on-rails - jumpstart - ActiveRecord::RecordNotUnique: PG::UniqueViolation: 错误: 重复键值违反唯一约束
- selenium - 需要使用 selenium Java 验证带有多个随机字符串的错误消息
- xamarin - Xamarin:样式 OnPlatform 和 DynamicResource
- angular - 角度过滤表结果无法正常工作
- javascript - 如何在使用 Lodash 时获取特定的键值
- typescript - React Native 中的 ReturnKeyType 数据类型
- android - 调整对象的大小以适应屏幕的大小
- python - “str”对象没有属性“map”
- python - 如何检查我训练的模型是否通过编码正确检测到对象?