python - 使用一个 Python 任务的输出并用作 Airflow 上另一个 Python 任务的输入
问题描述
因此,我正在使用 Apache Airflow 创建一个数据流,用于获取存储在 Pandas Dataframe 中的一些数据,然后将其存储到 MongoDB 中。所以我有两种 python 方法,一种用于获取数据并返回数据帧,另一种用于将其存储到相关数据库中。如何获取一项任务的输出并将其作为另一项任务的输入?这就是我到目前为止所拥有的(总结和浓缩版)
我研究了 xcom pull 和 push 的概念,这就是我在下面实现的,我还看到有一个用于 Airflow 的 MongoHook,但不太确定如何使用它。
import pandas as pd
import pymongo
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
def get_data(name, **context):
data = pd.read_csv('dataset.csv')
df = data.loc[data.name == name]
context['ti'].xcom_push(task_ids=['get-data'], value=data)
def push_to_db(df, dbname, collection):
client = pymongo.MongoClient(-insert creds here-)
db = client[dbname][collection]
data = df.to_dict(orient='records')
db.insert_many(data)
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='simple_xcom',
default_args=args,
start_date=datetime(2019, 09, 02),
schedule_interval="@daily",
retries=2
)
task1 = PythonOperator(task_id='get-data', params=['name': 'John'],
python_callable=get_data,
provide_context=True, dag=dag)
task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'),
'dbname': 'person', 'table': 'salary'),
python_callable=push_to_db, provide_context=True, dag=dag)
task1 >> task2
每次我尝试运行它时,它都会显示上下文不存在。所以也许我在将一个任务的输出作为另一个任务的输入方面做错了?
解决方案
看看示例 xcom DAG。
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py
推荐阅读
- java - 从 android 中的 URL 获取 JpegImage.aspx 验证码
- ios - 获取在 UIPickerView Xcode Swift 中选择的组件的索引
- regex - 我可以使用负前瞻来查找文件路径中的最后一个文件夹来提高简单性吗?
- data-binding - UWP 无法将 ICommand 绑定到用户控件
- sql - 自定义关系更新:是否需要主键?
- c# - ASP .NET MVC - 更新一对一数据失败
- unity3d - 使用 Unity for Oculus Go 在 VR 中移动
- rxjs - 映射和过滤一个可观察的流 RXJS
- ios - tableview.reload()后TextField数据消失
- angular - 将数组推入数组内并在更新数组后保存到数据库 - 角度