python - 气流 - 在 for 循环最终任务之后插入任务依赖
问题描述
我正在尝试运行一个 Airflow 管道,其中我有一个主要的 Postgres 任务,然后是一个 Python 循环任务,然后是一个最终的 postgres 任务,所以 dag 类似于这样
PythonOperator
|Task B.1|
Postgres |Task B. | Postgres
Task A ------|Task B. |-----> Task C
|Task B.n|
所以,我写了这个 dag:
import glob
import logging
import os
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import execute_values
dag_default_args = {
'owner': 'xxx',
'start_date': datetime.now() - timedelta(days=2),
'email': [],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'wait_for_downstream': False,
}
dag = DAG(
dag_id='xxx',
default_args=dag_default_args,
schedule_interval="@daily",
catchup=True,
max_active_runs=1,
concurrency=5
)
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
def insert_data_func(filename,**kwargs):
#path to read csv files
df = pd.read_csv(f'./dags/data/{filename}')
# Data transformations
df.dropna(inplace = True ,how='all')
df["id"] = pd.to_numeric(df["id"])
ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
conn_ps = ps_pg_hook.get_conn()
if len(df) > 0:
column_names = ['COLUMN1']
values = df[column_names].to_dict('split')
values = values['data']
insert_sql = """
INSERT INTO TABLE COLUMN1)
VALUES %s
"""
result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(df))
conn_ps.commit()
else:
None
return None
create_psql_table= PostgresOperator(
task_id="create_psql_table",
postgres_conn_id="postgres",
sql="""
CREATE SCHEMA IF NOT EXISTS SCHEMA;
CREATE TABLE IF NOT EXISTS SCHEMA.TABLE(
COLUMN1 INT,
);
""",
dag=dag
)
create_fact_table = PostgresOperator(
task_id="create_fact_table",
postgres_conn_id="postgres",
sql="""
CREATE TABLE IF NOT EXISTS SCHEMA.TABLE1 (
select
);
""",
dag=dag
)
files = ['1.csv', '2.csv', '3.csv']
for listing in files:
insert_data = PythonOperator(
task_id=f'insert_data_func_{listing}',
python_callable=insert_data_func,
op_kwargs={'filename': listing},
provide_context=True,
dag=dag
)
create_psql_table >> insert_data
[insert_data] >> create_fact_table
但这只是在运行 3.csv 之后才发生 create_fact_table ,我想在处理完所有 CSV 后运行。
所以,我的问题是,有没有办法
创建动态任务?和
一个愚蠢的,但我仍在学习python。有没有办法在不指定文件名的情况下获得文件的 for 循环列表?只是它们以字母开头并且是 CSV 吗?
解决方案
在你create_psql_table >> insert_data >> create_fact_table
的 for 循环中做。这种方式create_fact_table
只会在所有insert_data_funcX
任务成功执行后执行一次。
在 task_ids 中避免使用也是一个好主意,.
因为这个符号在某种程度上与 subdags 相关,并且在某些情况下可能会导致一些麻烦。
编辑:这个(注意[insert_data] >> create_fact_table
最后没有)
for listing in files:
insert_data = PythonOperator(
task_id=f'insert_data_func_{listing}',
python_callable=insert_data_func,
op_kwargs={'filename': listing},
provide_context=True,
dag=dag
)
create_psql_table >> insert_data >> create_fact_table
所以create_fact_table
会执行一次。
推荐阅读
- python - 使用相关参数求解和绘制 ODE
- c# - 如果值已存在于通用排序列表中,我将如何停止按钮单击事件?
- django - Django使用基于UUID的下载发送错误的文件名
- r - R Markdown 标题/正文间距格式
- powerbi - DAX 根据查找表中两列之间的度量值在另一个表中查找值
- ios - ios auto-renewable-subscriptions - 恢复操作
- gps - GPS追踪解决方案(Google Maps API)
- javascript - 将从 Promise 返回的数据分配给外部变量
- css - 在 Contenteditable div 内编辑没有 href 属性的 HTML标记
- javascript - 为什么 typeof let === '未定义'?