airflow - Airflow - 分别运行每个 python 函数
问题描述
我在脚本下方有气流,它将所有 python 脚本作为一个函数运行。我想让每个 python 函数单独运行,以便我可以跟踪每个函数及其状态。
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def db_log():
global db_con
try:
db_con = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
except:
print("Connection Failed.")
print('Connected successfully')
return (db_con)
def insert_data():
cur = db_con.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
def job_run():
db_log()
insert_data()
##########################################
t1 = PythonOperator(
task_id='DB_Connect',
python_callable=job_run,
# bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)
t1
上面的脚本工作得很好,但想按功能拆分它以更好地跟踪。任何人都可以在这方面提供帮助。火龙果..
更新代码(版本 2):
## Third party Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")
#######################
## Login to DB
def db_log(**kwargs):
global db_con
try:
db_con = psycopg2.connect(
" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
except:
print("Connection Failed.")
print('Connected successfully')
task_instance = kwargs['task_instance']
task_instance.xcom_push(value="db_con", key="db_log")
return (db_con)
def insert_data(**kwargs):
v1 = task_instance.xcom_pull(key="db_con", task_ids='db_log')
return (v1)
cur = db_con.cursor()
cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
#def job_run():
# db_log()
# insert_data()
##########################################
t1 = PythonOperator(
task_id='Connect',
python_callable=db_log,provide_context=True,
dag=dag)
t2 = PythonOperator(
task_id='Query',
python_callable=insert_data,provide_context=True,
dag=dag)
t1 >> t2
解决方案
有两种可能的解决方案:
A)为每个函数创建多个任务
Airflow 中的任务在不同的进程中被调用。由于第二个任务通常无法查看第一个任务的变量,因此被定义为global
不起作用的变量。
介绍: XCOM。这是 Airflow 的一个特性,我们已经为此回答了几个问题,例如这里(带有示例):Python Airflow - Return result from PythonOperator
编辑
您必须提供上下文并按照示例中的说明传递上下文。对于您的示例,这意味着:
- 添加
provide_context=True,
到您的PythonOperator
- 将签名更改
job_run
为def job_run(**kwargs):
data_warehouse_login(kwargs)
在函数内部将kwargs 传递给 data_warehouse_login
B) 创建一个完整的函数
在这种情况下,我仍然会删除全局(只需 call insert_data
,data_warehouse_login
从内部调用并返回连接)并只使用一项任务。
如果发生错误,则抛出异常。气流将处理这些就好了。只需确保在异常中放入适当的消息并使用最佳异常类型。
推荐阅读
- php - MySql 唯一标识键列的最佳实践
- bash - 如何匹配两个分隔符之间的字符串并在后面附加一个字符串
- python - Django 渲染和重定向 URL
- reactjs - 改为部署在子目录下/
- database - 分离 TFS 数据库时的后果和对用户的影响
- hibernate - 如何使用 JPARepository 仅返回目标实体
- c++ - 如何在右键单击时删除表格行
- ruby-on-rails - 将 paypal payflow 与 rails 集成
- javascript - 在移动设备上的图像之间滑动(左右)的最佳/最简单方法是什么?
- scala - 列表未在“foreach”语句中更新,返回空列表