首页 > 解决方案 > 气流,如何将变量从 BashOperator 任务传递给另一个

问题描述

我试图找出将变量从一个 BashOperator 任务传递到另一个任务的最通用/有效的方法。我想出了一个解决方案,将第一个 BashOperator 任务的输出推送到 xcom。然后这个字符串被 PythonOperator 任务拉取,该任务将字符串解析为键值对,然后推送到 xcom。最后,这些 kv 对可以被第二个 BashOperator 任务提取和使用。

我想从更有经验的气流用户那里得到一些评论,这种方法是否过于复杂还是可以?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime as dt 
from airflow.utils.dates import days_ago    

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': dt.datetime(2019,10,14,10,0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=1),
}

dag = DAG('communication-between-tasks-v1',
      catchup=False,
      default_args=default_args,
      schedule_interval='*/5 * * * *')

def parse_function(**context):
    ti = context['ti']
    msg = ti.xcom_pull('start_task')
    parsed_output_parameters = dict([x.split('=') for x in msg.split()])

    # Option1: Dictionary parsed from the string is pushed to xcom
    ti.xcom_push(key='parameters', value=parsed_output_parameters)

    # Option2: We can push every key value separately
    for k, v in parsed_output_parameters.items():
        ti.xcom_push(key=k, value=v)


start_task = BashOperator(
    task_id='start_task',
    bash_command='echo "FILE1=file1.h5 FILE2=file2.txt VARIABLE=400"',
    xcom_push=True,
    dag=dag)

parse = PythonOperator(
    task_id='parse',
    python_callable=parse_function,
    provide_context=True,
    dag=dag
)

end_task = BashOperator(
    task_id='end_task',
    bash_command='echo start_task params: {{ ti.xcom_pull(task_ids="parse", 
    key="parameters")["FILE1"] }} \
    {{ ti.xcom_pull(task_ids="parse", key="FILE2") }}',
    dag=dag
)

start_task >> parse >> end_task

标签: airflow

解决方案


Xcom 在传递非常少量的数据时效果最好,应该谨慎使用(因为它全部写入气流数据库)。

如果您有 2 个不同的 BashOperator 任务,并且您想将数据从一个传递到另一个,为什么不在第一个任务中将输出写入文件并与第二个任务一起读取呢?(您可以在第二个 BashOperator 任务中包含一行,在读取文件内容后验证文件是否包含数据和rm文件。)


推荐阅读