首页 > 解决方案 > 芹菜——结果后端不返回组结果

问题描述

更新:2020 年 7 月 25 日星期六

总体问题:我有一个包含 a 的链,celery.group并且该链没有给我我认为的结果。在你告诉我results.save()再恢复之前。我已经尝试过了,但我没有运气。下面是我正在执行的代码中的 celery worker 以获得所需的结果。

我有一个正在运行的芹菜工人,看起来像这样:

[2020-07-23 13:41:04,278: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2020-07-23 13:41:04,284: DEBUG/MainProcess] | Worker: Building graph...
[2020-07-23 13:41:04,285: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2020-07-23 13:41:04,323: DEBUG/MainProcess] | Consumer: Building graph...
[2020-07-23 13:41:04,402: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Gossip, Heart, Agent, Tasks, Control, event loop}
 
 -------------- celery@worker1.deon-aarininc v4.4.0 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-42-generic-x86_64-with-glibc2.29 2020-07-23 13:41:04
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fe4b1e98580
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

下面的函数是我为创建并行进程链而调用的函数。API 调用 --> 转换 DICT --> 并行过程 --> 结果。结果显示了工作人员在终端中运行的位置,但我需要他们去解释器。

@app.task
def setup_chunk(nested_dict):
    """
    Setup nested data structure for parallel run
        - equivalant to an O(n) for loop call 

    Args:
        nested_dict - nested dictionary
    Returns:
        list of jobs
    """
    return [nested_dict[i::len(nested_dict)] for i in range(len(nested_dict))]


@app.task
def process_step(board, header=HEADER):
    """
    Process step in run

    Args:
        nested_dict - nested dictionary
    Returns:
        results of running Celery in parallel
    """

    api_bid_url = ""

    return api_request_get(api_bid_url, HEADER)



@app.task
def process_group(chunk):
    return group([process_step.s(chunk[i]) for i in range(len(chunk))]).apply_async()


$ ipython3

[LN x]: pipeline = (setup_chunk.s(board_data['data']) | \ 
    ...:             process_group.s()).apply_async()  


这就是ipython3解释器的结果:所有工作任务 ID。

Out[42]: 
[['0abee691-ace4-47b8-83cf-b29502c5499d', None],
 [[['08873986-5193-4035-8442-1c46039a93f1', None], None],
  [['12adf051-1a78-4b9a-ba49-8b0ad09aa157', None], None],
  [['043811f5-aba1-4942-bf9d-f90cac0623cd', None], None],
  [['d46f1f61-c753-49cd-a9d4-bf7a3e69a037', None], None],
  [['b62ee40f-a0d3-4410-b36c-5c3f7967f61a', None], None],
  [['334b6272-3667-4dd3-9eab-c3c05baad760', None], None],
  [['5a75d948-6941-4598-8b97-c60b8960bbd0', None], None],
  [['18a93b78-435e-4489-b25c-bb699addd160', None], None],
  [['3c0f9834-eb1c-417c-b71e-03872ba19e90', None], None],
  [['a7d0ff28-d139-4f02-8975-8575613aef7c', None], None],
  [['94edd270-e612-4b68-932e-bb16c2a8af35', None], None],
  [['13c95d8c-c65b-407e-8f34-3dd9fa171591', None], None],
  [['fb689945-1fed-49e4-b430-48344a83fcf4', None], None]]]


我想要的是从工人的终端上出来。

标签: pythonrediscelerybackend

解决方案


推荐阅读