postgresql - Airflow 2.0.2:Dag 无法正确渲染模板
问题描述
我有两个简单的任务,一个是获取 id 列表,另一个必须使用 echo 命令显示 id 列表。xcom push 的结果似乎是正确的。我有一个元组列表,如下所示。
return function(xcom push) 的输出是一个元组列表,如下:
[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]
这是我的代码:
def read_sql(file_name):
with open(SQL_PATH + file_name) as f:
sql = f.read()
return sql
def query_and_push(sql):
pg_hook = PostgresHook(postgres_conn_id='redshift')
records = pg_hook.get_records(sql=sql)
return records
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'xcom_using_jinja_template',
default_args=default_args,
description='',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['test'],
) as dag:
t1 = PythonOperator(
task_id='get_query_id',
python_callable=query_and_push,
provide_context=True,
op_kwargs={
'sql' : read_sql('warmupqueryid.sql')
}
)
templated_command = dedent(
"""
{% for item in params.query_ids %}
echo {{ item[0] }};
{% endfor %}
"""
)
t2 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id'), key='return_value' }}"},
)
t1 >> t2
由于这个错误,我的最后一项任务失败了,我不明白为什么它没有获得 xcom push 的价值。我不确定这是否是一个错误,或者我是否只是错过了一些东西。
*** Reading remote log from s3://ob-airflow-pre/logs/xcom_using_jinja_template/templated/2021-05-26T17:22:44.023533+00:00/1.log.
[2021-05-26 17:22:45,633] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,663] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-26 17:22:45,664] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,675] {taskinstance.py:1089} INFO - Executing <Task(BashOperator): templated> on 2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,679] {standard_task_runner.py:52} INFO - Started process 413 to run task
[2021-05-26 17:22:45,683] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xcom_using_jinja_template', 'templated', '2021-05-26T17:22:44.023533+00:00', '--job-id', '1811', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/xcom_test.py', '--cfg-path', '/tmp/tmpkk2x0gyd', '--error-file', '/tmp/tmpc2ka7x4x']
[2021-05-26 17:22:45,683] {standard_task_runner.py:77} INFO - Job 1811: Subtask templated
[2021-05-26 17:22:45,859] {logging_mixin.py:104} INFO - Running <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-26 17:22:45,945] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_using_jinja_template
AIRFLOW_CTX_TASK_ID=templated
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T17:22:44.023533+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,946] {bash.py:135} INFO - Tmp dir root location:
/tmp
[2021-05-26 17:22:45,947] {bash.py:158} INFO - Running command:
echo ;
echo {;
echo {;
echo ;
echo t;
echo i;
echo .;
echo x;
echo c;
echo o;
echo m;
echo _;
echo p;
echo u;
echo l;
echo l;
echo (;
echo t;
echo a;
echo s;
echo k;
echo _;
echo i;
echo d;
echo s;
echo =;
echo ';
echo g;
echo e;
echo t;
echo _;
echo q;
echo u;
echo e;
echo r;
echo y;
echo _;
echo i;
echo d;
echo ';
echo );
echo ,;
echo ;
echo k;
echo e;
echo y;
echo =;
echo ';
echo r;
echo e;
echo t;
echo u;
echo r;
echo n;
echo _;
echo v;
echo a;
echo l;
echo u;
echo e;
echo ';
echo ;
echo };
echo };
[2021-05-26 17:22:45,954] {bash.py:169} INFO - Output:
[2021-05-26 17:22:45,955] {bash.py:173} INFO -
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO -
[2021-05-26 17:22:45,955] {bash.py:173} INFO - t
[2021-05-26 17:22:45,955] {bash.py:173} INFO - i
[2021-05-26 17:22:45,955] {bash.py:173} INFO - .
[2021-05-26 17:22:45,955] {bash.py:173} INFO - x
[2021-05-26 17:22:45,955] {bash.py:173} INFO - c
[2021-05-26 17:22:45,955] {bash.py:173} INFO - o
[2021-05-26 17:22:45,955] {bash.py:173} INFO - m
[2021-05-26 17:22:45,955] {bash.py:173} INFO - _
[2021-05-26 17:22:45,955] {bash.py:173} INFO - p
[2021-05-26 17:22:45,956] {bash.py:173} INFO - u
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: syntax error near unexpected token `;'
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: ` echo (;'
[2021-05-26 17:22:45,956] {bash.py:177} INFO - Command exited with return code 1
[2021-05-26 17:22:45,976] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py", line 180, in execute
raise AirflowException('Bash command failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code.
[2021-05-26 17:22:45,978] {taskinstance.py:1525} INFO - Marking task as UP_FOR_RETRY. dag_id=xcom_using_jinja_template, task_id=templated, execution_date=20210526T172244, start_date=20210526T172245, end_date=20210526T172245
[2021-05-26 17:22:46,014] {local_task_job.py:146} INFO - Task exited with return code 1
当我将params.query_ids更改 为上面的列表(编码)时,我得到了我所除外的内容。
templated_command = dedent(
"""
{% for item in [(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)] %}
echo {{ item[0] }};
{% endfor %}
"""
)
预期结果:
[2021-05-27 10:59:05,887] {bash.py:158} INFO - Running command:
echo 19343160;
echo 19350561;
echo 19351381;
echo 19351978;
echo 19356674;
echo 19356676;
echo 19356678;
echo 19356681;
echo 19356682;
echo 19359607;
解决方案
我在天文学家论坛上回答了这个问题,但在这里也为其他人提供了答案——如果有帮助的话。
您将无法以 Jinja 模板的方式直接使用所写的params ,bash_command
因为 params 不是. 但是,您可以将 get_query_id 任务中的 return_value XCom 作为 Jinja 中的变量引用,如下所示:template_field
BashOperator
templated_command = dedent(
"""
{% set query_ids = ti.xcom_pull(task_ids='get_query_id', key='return_value') %}
{% for item in query_ids %}
echo {{ item[0] }};
{% endfor %}
"""
)
t2 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
现在 templated_command 直接引用您需要的 XCom 并将其设置为 Jinja 字符串中的一个变量并获得您期望的输出:
推荐阅读
- puppeteer - Puppeteer:在 browser.disconnect 之后,Chromium 实例在后台保持活动状态
- powershell - PowerShell中的`>`(重定向)和Out-File有什么区别
- python - Google Cloud Profiler 显示 [未知 - 无 Python 线程状态]
- azure - Azure 应用服务身份验证/授权返回 HTTP 401 IDX10205:颁发者验证失败
- autohotkey - 自动热键:运行后 MsgBox “完成”
- python - 在 docker 容器中分析 Python 非常慢(cProfile 和 pyinstrument)
- css - Angular Material 中的可扩展行,带有 mat-row 指令
- javascript - 如何从 Node.js 验证对话框流的参数
- wordpress - 如何将图库功能添加到 WordPress 块插件
- neo4j - neo4j 查找父节点是否存在关系