首页 > 解决方案 > 气流任务取决于另一个任务结果?

问题描述

我有 2 个任务

第一个任务下载一些数据集到folder_1

第二个任务清理每个文件folder_1

CRUDE_NEW_DATASET_LOCAL是本地路径

download_crude_new_dataset = BashOperator(
        task_id = "download_crude_new_dataset",
        bash_command = bash.download_crude_new_dataset(),
        dag=dag
)

cleaning_crude_new_dataset = []
crude_new_dataset = glob(bash.CRUDE_NEW_DATASET_LOCAL+"/*",recursive=True)
for p in crude_new_dataset :
    path = p.replace('\\','/')
    if os.path.isfile(path):
        cleaning_crude_new_dataset.append(
            BashOperator(
                task_id = "cleaning_crude_new_dataset-"+bash._path_leaf_(path),
                bash_command = bash.cleaning_dataset(path),
                dag=dag
            )
    ) 

download_crude_new_dataset >> cleaning_crude_new_dataset

当我触发气流 dag 时的问题,folder_1仍然是空的。并且 make cleaning_crude_new_dataset(任务数组)是空的。

谢谢你的帮助

标签: pythonairflowairflow-scheduler

解决方案


这个问题解决了

将任务从单个 Dag 拆分为多个 Dag 并触发其他 dag ( dag1>> dag2>> dag3... )

如果需要,将dag_dir_list_interval300 更改为小数字


推荐阅读