python - Airflow:使用 Taskflow API 在 PythonOperator 中运行 PostgresOperator
问题描述
我的目标是遍历从另一个 PythonOperator 获得的列表,并在此循环中将 json 保存到 Postgres DB。我正在使用 Airflow 2.0 中的 Taskflow API。
如果我将 SQL 语句直接sql
写入PostgresOperator
. 但是当我将SQL写入文件并将文件路径放入sql
参数时,就会抛出错误:
psycopg2.errors.SyntaxError: syntax error at or near "sql"
LINE 1: sql/insert_deal_into_deals_table.sql
这是任务的代码:
@task()
def write_all_deals_to_db(all_deals):
for deal in all_deals:
deal_json = json.dumps(deal)
pg = PostgresOperator(
task_id='insert_deal',
postgres_conn_id='my_db',
sql='sql/insert_deal_into_deals_table.sql',
params={'deal_json': deal_json}
)
pg.execute(dict())
奇怪的是,如果我将它用作独立的运算符(在 PythonOperator 之外),则该代码可以工作。像这样:
create_deals_table = PostgresOperator(
task_id='create_deals_table',
postgres_conn_id='my_db',
sql='sql/create_deals_table.sql'
)
我尝试了很多,我想这与 Jinja 模板有关。不知何故,在 PythonOperator 中,PostgresOperator 既不能使用参数也不能使用 .sql 文件解析。
非常感谢任何提示或参考!
编辑:
此代码有效,但可以快速修复。我仍然遇到的实际问题是,PostgresOperator
当我在PythonOperator
.
@task()
def write_all_deals_to_db(all_deals):
sql_path = 'sql/insert_deal_into_deals_table.sql'
for deal in all_deals:
deal_json = _transform_json(deal)
sql_query = open(path.join(ROOT_DIRECTORY, sql_path)).read()
sql_query = sql_query.format(deal_json)
pg = PostgresOperator(
task_id='insert_deal',
postgres_conn_id='my_db',
sql=sql_query
)
pg.execute(dict())
解决方案
推荐阅读
- java - 使用 DateTimeFormatter 时选择正确的 Timestampformat
- c# - 如何在 C# 中调用 Sobel 过滤器、图像处理的函数
- python - 无法为 docker 容器安装 google-chrome-stable
- angular - 如何在echarts中添加渐变色?
- javascript - 将复杂的 javascript 对象转换为 JSON nodejs
- php - 尝试将 GCP 的 Google Sheets API 与电子表格链接时出错
- android - 如何检查android设备上的时间是由网络还是由用户设置的?
- linux - 从 Windows VM 连接到 Linux VM 时出现 ORA-12170
- redis - AWS Elasticache Redis 6 是否使用多线程?
- php - 当没有“活动”状态的数据时进行调节