airflow - 气流如何将 xcom 结果传递给 SubDagOperator,它将确定 subdag 中 BashOperator 的数量?
问题描述
你能帮我解决这个问题吗?我有以下代码:我之前定义了:DAG_NAME、SUBDAG_ID、default_args。我想知道我是否缺少任何参数,例如 provide_context 或其他参数,或者是否支持/不支持。
p_dag = DAG(dag_id=DAG_NAME,
default_args=default_args,
schedule_interval='@once',
provide_context=True)
step1 = BashOperator(
task_id='xcom_feed',
bash_command=COMMAND,
dag=p_dag,
xcom_push=True)
def dag_subdag(parent_dag=DAG_NAME, child_dag=SUBDAG_ID,
args=default_args, **kwargs):
sdag = DAG(dag_id=f"{parent_dag}.{child_dag}", default_args=args,
schedule_interval="@once", provide_context=True)
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='xcom_feed')
value = int(literal_eval(value)[0])
for i in range(value):
BashOperator(task_id=f"{child_dag}-task-{i}",
bash_command=COMMAND_ON_{i},
dag=sdag)
return sdag
step2 = SubDagOperator(task_id=SUBDAG_ID,
subdag=dag_subdag(DAG_NAME, SUBDAG_ID, default_args),
dag=p_dag)
step1 >> step2
可悲的是,我将其作为 PythonOperator 运行,并在定义 subdag bash 运算符之前打印了 dag 和 sdag 的类型,并且它按预期执行而没有错误,告诉我它返回一个 DAG,但是当我运行这个时,它在我身上爆炸了:错误的最后一部分是
/airflow/models/dag.py", line 1538, in create_dagrun
return self.get_dag().create_dagrun(run_id=run_id,
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
解决方案
推荐阅读
- java - Ιs there a way to use a class as a type of variable of another class with GSON?
- javascript - 使用 JavaScript 将一个数组按另一个数组分组
- java - 当服务器仅从请求中读取标头时,Http 客户端未收到响应
- python - 如何让python代码更高效/更整洁
- sharepoint - 通过站点搜索框 sharepoint 2013 搜索时如何使元数据工作
- oracle - 为什么在 Oracle 表单中(基于过程)查询不返回任何值?
- typescript - 打字稿类型 a 或类型 b
- django - 我可以在 django 模型中指定一个从某个值派生的上传文件夹吗?
- matlab - 在Matlab中寻找一种矢量化查找命令的方法
- javascript - 汉堡菜单打不开