首页 > 解决方案 > Airflow:使用 Taskflow API 在 PythonOperator 中运行 PostgresOperator

问题描述

我的目标是遍历从另一个 PythonOperator 获得的列表,并在此循环中将 json 保存到 Postgres DB。我正在使用 Airflow 2.0 中的 Taskflow API。

如果我将 SQL 语句直接sql写入PostgresOperator. 但是当我将SQL写入文件并将文件路径放入sql参数时,就会抛出错误:

psycopg2.errors.SyntaxError: syntax error at or near "sql"
LINE 1: sql/insert_deal_into_deals_table.sql

这是任务的代码:

    @task()
    def write_all_deals_to_db(all_deals):
        for deal in all_deals:
            deal_json = json.dumps(deal)
            pg = PostgresOperator(
                task_id='insert_deal',
                postgres_conn_id='my_db',
                sql='sql/insert_deal_into_deals_table.sql',
                params={'deal_json': deal_json}
            )
            pg.execute(dict())

奇怪的是,如果我将它用作独立的运算符(在 PythonOperator 之外),则该代码可以工作。像这样:

create_deals_table = PostgresOperator(
    task_id='create_deals_table',
    postgres_conn_id='my_db',
    sql='sql/create_deals_table.sql'
)

我尝试了很多,我想这与 Jinja 模板有关。不知何故,在 PythonOperator 中,PostgresOperator 既不能使用参数也不能使用 .sql 文件解析。

非常感谢任何提示或参考!

编辑:

此代码有效,但可以快速修复。我仍然遇到的实际问题是,PostgresOperator当我在PythonOperator.

@task()
def write_all_deals_to_db(all_deals):
    sql_path = 'sql/insert_deal_into_deals_table.sql'
    for deal in all_deals:
        deal_json = _transform_json(deal)
        sql_query = open(path.join(ROOT_DIRECTORY, sql_path)).read()
        sql_query = sql_query.format(deal_json)
        pg = PostgresOperator(
            task_id='insert_deal',
            postgres_conn_id='my_db',
            sql=sql_query
        )
        pg.execute(dict())

标签: pythonjinja2airflow

解决方案


推荐阅读