首页 > 解决方案 > 气流 python 通过 XCOM 处理返回值

问题描述

我正在尝试创建如下所述的 Airflow dag:我有一个非常大的 python 代码,它最终会创建一个文件。该文件是使用特定名称创建的,例如sales20180802130200.json

以下气流任务是s3BucketUpload操作员。它需要获取文件名才能上传到 s3。

第一个 python 文件可能由bashOperator. 它如何使用文件名创建 Xcom 密钥?有没有其他方法可以传递价值?

谢谢沙比

标签: airflow

解决方案


只要路径参数包含在运算符类中的模板参数中,Airflow 就会为您插入值。我不知道 s3BucketUpload 运算符的样子,所以我假设参数名称。

class s3BucketUploadOperator(BaseOperator):
    # this tuple is not used by anything in my operator classes and is not passed anywhere
    template_fields = ('local_path', 's3_path', )
    ...

def py_fn(task_instance, **context):
    task_instance.xcom_push(key='file_name', value='file.name')

py_task = PythonOperator(
    dag=dag,
    task_id='py_task',
    provide_context=True,
    python_callable=py_fn
)

s3_task = s3BucketUploadOperator(
    dag=dag,
    task_id='s3_task',
    s3_conn_id='?',
    local_path="path/to/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}",
    s3_path="path/to/s3/dir/{{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}"
)

编辑

如果要使用 BashOperator,bash_command="echo {{ task_instance.xcom_pull(key='file_name', task_ids='py_task') }}"应将文件名打印到任务日志。


推荐阅读