首页 > 解决方案 > 气流 - 在 for 循环最终任务之后插入任务依赖

问题描述

我正在尝试运行一个 Airflow 管道,其中我有一个主要的 Postgres 任务,然后是一个 Python 循环任务,然后是一个最终的 postgres 任务,所以 dag 类似于这样

            PythonOperator
              |Task B.1|
 Postgres     |Task B. |      Postgres
 Task A ------|Task B. |-----> Task C
              |Task B.n|

所以,我写了这个 dag:

import glob
import logging
import os
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import execute_values

dag_default_args = {
    'owner': 'xxx',
    'start_date': datetime.now() - timedelta(days=2),
    'email': [],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
    'depends_on_past': False,
    'wait_for_downstream': False,
}

dag = DAG(
    dag_id='xxx',
    default_args=dag_default_args,
    schedule_interval="@daily",
    catchup=True,
    max_active_runs=1,
    concurrency=5
)

AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

def insert_data_func(filename,**kwargs):
    #path to read csv files
    df = pd.read_csv(f'./dags/data/{filename}')  
        
    # Data transformations
    df.dropna(inplace = True ,how='all')
    df["id"] = pd.to_numeric(df["id"])

    ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
    conn_ps = ps_pg_hook.get_conn()

    if len(df) > 0:
        column_names = ['COLUMN1']

        values = df[column_names].to_dict('split')
        values = values['data']

        insert_sql = """
                    INSERT INTO TABLE COLUMN1) 
                    VALUES %s
                    """
                    
        result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(df))
        conn_ps.commit()
    else:
        None

    return None

create_psql_table= PostgresOperator(
    task_id="create_psql_table",
    postgres_conn_id="postgres",
    sql="""
        CREATE SCHEMA IF NOT EXISTS SCHEMA;
        CREATE TABLE IF NOT EXISTS SCHEMA.TABLE(
            COLUMN1                            INT,
            );
    """,
    dag=dag
)

create_fact_table = PostgresOperator(
    task_id="create_fact_table",
    postgres_conn_id="postgres",
    sql="""
        CREATE TABLE IF NOT EXISTS SCHEMA.TABLE1 (
            select 
            );
    """,
    dag=dag
)



files = ['1.csv', '2.csv', '3.csv']


for listing in files:
    insert_data = PythonOperator(
        task_id=f'insert_data_func_{listing}',
        python_callable=insert_data_func,
        op_kwargs={'filename': listing},
         provide_context=True,
        dag=dag
    )
    create_psql_table >> insert_data

[insert_data] >> create_fact_table

但这只是在运行 3.csv 之后才发生 create_fact_table ,我想在处理完所有 CSV 后运行。

所以,我的问题是,有没有办法

  1. 创建动态任务?和

  2. 一个愚蠢的,但我仍在学习python。有没有办法在不指定文件名的情况下获得文件的 for 循环列表?只是它们以字母开头并且是 CSV 吗?

标签: pythonairflow

解决方案


在你create_psql_table >> insert_data >> create_fact_table的 for 循环中做。这种方式create_fact_table只会在所有insert_data_funcX任务成功执行后执行一次。

在 task_ids 中避免使用也是一个好主意,.因为这个符号在某种程度上与 subdags 相关,并且在某些情况下可能会导致一些麻烦。

编辑:这个(注意[insert_data] >> create_fact_table最后没有)

for listing in files:
    insert_data = PythonOperator(
        task_id=f'insert_data_func_{listing}',
        python_callable=insert_data_func,
        op_kwargs={'filename': listing},
        provide_context=True,
        dag=dag
    )
    create_psql_table >> insert_data >> create_fact_table

将导致: 在此处输入图像描述

所以create_fact_table会执行一次。


推荐阅读