airflow - 气流,如何将变量从 BashOperator 任务传递给另一个
问题描述
我试图找出将变量从一个 BashOperator 任务传递到另一个任务的最通用/有效的方法。我想出了一个解决方案,将第一个 BashOperator 任务的输出推送到 xcom。然后这个字符串被 PythonOperator 任务拉取,该任务将字符串解析为键值对,然后推送到 xcom。最后,这些 kv 对可以被第二个 BashOperator 任务提取和使用。
我想从更有经验的气流用户那里得到一些评论,这种方法是否过于复杂还是可以?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime as dt
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dt.datetime(2019,10,14,10,0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': dt.timedelta(minutes=1),
}
dag = DAG('communication-between-tasks-v1',
catchup=False,
default_args=default_args,
schedule_interval='*/5 * * * *')
def parse_function(**context):
ti = context['ti']
msg = ti.xcom_pull('start_task')
parsed_output_parameters = dict([x.split('=') for x in msg.split()])
# Option1: Dictionary parsed from the string is pushed to xcom
ti.xcom_push(key='parameters', value=parsed_output_parameters)
# Option2: We can push every key value separately
for k, v in parsed_output_parameters.items():
ti.xcom_push(key=k, value=v)
start_task = BashOperator(
task_id='start_task',
bash_command='echo "FILE1=file1.h5 FILE2=file2.txt VARIABLE=400"',
xcom_push=True,
dag=dag)
parse = PythonOperator(
task_id='parse',
python_callable=parse_function,
provide_context=True,
dag=dag
)
end_task = BashOperator(
task_id='end_task',
bash_command='echo start_task params: {{ ti.xcom_pull(task_ids="parse",
key="parameters")["FILE1"] }} \
{{ ti.xcom_pull(task_ids="parse", key="FILE2") }}',
dag=dag
)
start_task >> parse >> end_task
解决方案
Xcom 在传递非常少量的数据时效果最好,应该谨慎使用(因为它全部写入气流数据库)。
如果您有 2 个不同的 BashOperator 任务,并且您想将数据从一个传递到另一个,为什么不在第一个任务中将输出写入文件并与第二个任务一起读取呢?(您可以在第二个 BashOperator 任务中包含一行,在读取文件内容后验证文件是否包含数据和rm
文件。)
推荐阅读
- c# - 在 WPF 中为我的列表视图项目制作子列表?
- graphql - 突变方法是否需要处于顶层?
- python - uwsgi master 优雅关机
- jquery - 为什么它没有使用 jquery.post() 在 asp.net mvc 控制器中检索发布的数据
- ios - 在 Swift 4 中使用 UIActivityViewController 共享 pdf 文件
- sql - 如何在不使用“EXCEPT”的情况下从重复表中查找丢失的数据
- java - 防止缓慢的作业接管线程池
- oracle - 如果我在同一个包中的不同过程上重新编译包体,执行过程会发生什么
- unity3d - Unity3d - 玩家在特定块后翻转
- mfc - 在 CListCtrl 中未调用 DrawItem()