首页 > 解决方案 > 如何修复 Airflow dag 中的失败任务

问题描述

我是 Airflow 的新手,只是尝试在 DAG 中进行简单的数据转换以进行练习。但是,我不确定为什么第一个任务总是意外失败。有人可以给我一些关于如何在 DAG 中调试失败任务或指出错误部分的提示吗?非常感激。

import pandas as pd
import airflow
from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta



def transform_data(*args, **kwargs):

    df = pd.read_csv("C:/aws-airflow/data.csv")
    df['element'] = df['element'].str.slice(2).str.replace('-', ",")

    df.to_csv("C:/aws-airflow/data_new.csv")

default_args = {
    'owner' : 'airflow',
    'start_date' : datetime(2021, 9, 1),
    'retries' : 0,
    'retry_delay' : timedelta(minutes = 2)


With DAG(
    dag_id = 'data_pipeline',
    schedule_interval = "@daily",
    default_args = default_args
) as dag:


    transform_data = PythonOperator(
        task_id = "transform_data",
        python_callable = transform_data
    )
    
    task_end = DummyOperator(
        task_id = 'none'
    )
    transform_date >> task_end

标签: pythonairflowdirected-acyclic-graphs

解决方案


There are 2 issues in your code.

  1. You have rename your python function to have a different name that the operator object. Let's call it transform_data_fn.
  2. When you are defining the task dependencies, you have a typo. transform_data instead of transform_date.

I took your code as it was, and changed the 2 mistakes:

import pandas as pd
from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


def transform_data_fn(*args, **kwargs):

    df = pd.read_csv("data/data.csv")
    df["element"] = df["element"]

    df.to_csv("data/data_new.csv")


default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 9, 1),
    "retries": 0,
    "retry_delay": timedelta(minutes=2),
}


with DAG(
    dag_id="data_pipeline", schedule_interval="@daily", default_args=default_args
) as dag:

    transform_data = PythonOperator(
        task_id="transform_data", python_callable=transform_data_fn, dag=dag
    )

    task_end = DummyOperator(task_id="none")

    transform_data >> task_end

NOTE: I'm not doing any transformation in the dataframe as it is irrelevant to the code itself. Also, because I didn't know how your csv file looks like.


推荐阅读