首页 > 解决方案 > 芹菜:链中的和弦和组无法正常工作

问题描述

我正在做同样的任务,需要重用以前任务计算的结果。更具体地说,我有三个模型,有两个步骤,第一步是清理和分割原始数据,第二个任务是两种不同的标记化。其中两个模型共享相同的分词器,第三个模型需要第二个分词器计算的结果。因此,我首先需要对原始输入进行清理和分段,然后将其传递给分词器,其中一个分词结果被馈送到两个模型,另一个被馈送到最后一个模型。最后,我需要合并结果。

但是,我似乎无法完成任务。模型预测任务已运行,但不能全部传递给最后一个组合任务。我简单的任务如下。

@app.route('test', methods = ['GET'])
def test():
   result = test_chain_group.delay(10)
   result = [v for v in result.collect()]

   return str(result)

@shared_task
def test_chain_group(num):
        header = [(add.s(5) | group([add.s(6), mul.s(9)])), (mul.s(5)|add.s(3))]
        chord_r = chord(header, aggr.s())
        chain_r = (add.s(7)|chord_r)
        result = chain_r(num)
        return result

@shared_task
def add(a,b):
        return a+b
@shared_task
def mul(a,b):
        return a*b

@shared_task
def aggr1(data):
    return data

虽然 worker 正确计算了 28、198、88,但只有 28 和 198 被传递给 aggr1。此外,烧瓶应用程序在将某些内容返回给客户端之前没有等待结果返回。

标签: pythoncelery

解决方案


推荐阅读