airflow - 在自定义操作员气流中提取 xcom 值
问题描述
我写了一个HadoopPutHdfs
在 Airflow 中调用的自定义运算符,所以我需要将xxx
参数传递给HadoopPutHdfs
并且我需要填充任务xxx
的返回值generate_file_path
with DAG(dag_id='my_custom_operator_dag', schedule_interval='1 * * * *', default_args=default_args, catchup=False) as dag:
generate_file_path = PythonOperator(
task_id='generate_file_path',
python_callable=generate_file_path_func,
dag=dag,
)
put_to_hdfs = HadoopPutHdfs(
task_id='put_to_hdfs',
headers={'Content-Type': 'text/plain'},
hdfs_path='webhdfs/v1/user/hive/13.zip',
hadoop_host='10.10.10.146',
hadoop_port=9870,
source_path='/opt/airflow/dags/1.zip',
dag=dag,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}",
)
这条线不工作,
xxx= "{{ ti.xcom_pull(task_ids=['generate_file_path']) }}"
如何将 <code>generate_file_path 函数的数量传递给xxx
参数?
解决方案
听起来您在自定义运算符中缺少xxx
as atemplate_field
的定义。例如:
class CustomDummyOperator(BaseOperator):
template_fields = ('msg_from_previous_task',)
def __init__(self,
msg_from_previous_task,
*args, **kwargs) -> None:
super(CustomDummyOperator, self).__init__(*args, **kwargs)
self.msg_from_previous_task = msg_from_previous_task
def execute(self, context):
print(f"Message: {self.msg_from_previous_task}")
DAG:
def return_a_str():
return "string_value_from_op1"
task_1 = PythonOperator(
task_id='task_1',
dag=dag,
python_callable=return_a_str,
)
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)
的输出task_2
是:Message: string_value_from_op1
您可以使用XcomArg获得更简洁的语法:
task_2 = CustomDummyOperator(
task_id='task_2',
dag=dag,
msg_from_previous_task=task_1.output
# msg_from_previous_task="{{ ti.xcom_pull(task_ids='task_1') }}"
)
推荐阅读
- javascript - 在 Dynamics 365 中使用 ADAL.js 获取没有弹出窗口的令牌
- pandas - 用 91 替换手机号码中的前导零
- javascript - 如何检查我的按钮是否在 JavaScript 中被点击?
- ember.js - 更新模型时如何将signed_id传递/隐藏到Rails API控制器
- docker - 如何使用 Dockerfile 在启动 docker 容器上运行 jboss-cli
- excel - 循环遍历 2 个数组 - 在表中查找 Header 然后将值粘贴到正确的位置
- python - 删除字典python中的反向重复项
- debugging - 带有 nativescript-vue 的 Chrome devtools 不起作用
- xml - XPath 获取除一个子节点外的完整节点
- python - 程序只读取 csv 文件的第一行