python - 如何修复 Airflow dag 中的失败任务
问题描述
我是 Airflow 的新手,只是尝试在 DAG 中进行简单的数据转换以进行练习。但是,我不确定为什么第一个任务总是意外失败。有人可以给我一些关于如何在 DAG 中调试失败任务或指出错误部分的提示吗?非常感激。
import pandas as pd
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def transform_data(*args, **kwargs):
df = pd.read_csv("C:/aws-airflow/data.csv")
df['element'] = df['element'].str.slice(2).str.replace('-', ",")
df.to_csv("C:/aws-airflow/data_new.csv")
default_args = {
'owner' : 'airflow',
'start_date' : datetime(2021, 9, 1),
'retries' : 0,
'retry_delay' : timedelta(minutes = 2)
With DAG(
dag_id = 'data_pipeline',
schedule_interval = "@daily",
default_args = default_args
) as dag:
transform_data = PythonOperator(
task_id = "transform_data",
python_callable = transform_data
)
task_end = DummyOperator(
task_id = 'none'
)
transform_date >> task_end
解决方案
There are 2 issues in your code.
- You have rename your python function to have a different name that the operator object. Let's call it transform_data_fn.
- When you are defining the task dependencies, you have a typo.
transform_data
instead oftransform_date
.
I took your code as it was, and changed the 2 mistakes:
import pandas as pd
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def transform_data_fn(*args, **kwargs):
df = pd.read_csv("data/data.csv")
df["element"] = df["element"]
df.to_csv("data/data_new.csv")
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 9, 1),
"retries": 0,
"retry_delay": timedelta(minutes=2),
}
with DAG(
dag_id="data_pipeline", schedule_interval="@daily", default_args=default_args
) as dag:
transform_data = PythonOperator(
task_id="transform_data", python_callable=transform_data_fn, dag=dag
)
task_end = DummyOperator(task_id="none")
transform_data >> task_end
NOTE: I'm not doing any transformation in the dataframe as it is irrelevant to the code itself. Also, because I didn't know how your csv file looks like.
推荐阅读
- kubernetes - 通过谷歌云功能连接集群节点
- python - ndarray.shape 的奇怪输出
- c++ - 如何将另一个迭代器的地址归因于迭代器?
- java - SwipeGestureDetector 将图像从 int [] 更改为 Imageview(顶部和底部)
- cocoa - 如何在 Apple Cocoa 应用程序中获取设备 ID?
- cuda - CUDA:为什么将共享内存阵列填充一列会使内核速度降低 250%?
- devexpress - DevExtreme DataGrid 颜色框
- c++ - 加解密生成器项目
- node.js - 为什么使用archiver.file模块压缩文件时出现“队列关闭错误”
- c++ - 在编译期间是否有任何用于替换函数的钩子接口?