首页 > 解决方案 > 如何在 PythonOperator Airflow 中调用返回值?

问题描述

我是 Airflow 的新手,我正在练习一些,例如我有一个读取文件(excel)并将转换后的文件返回到 DataFrame 的函数。我的第二个功能是接收该文件并删除空值并再次返回没有空值的 DF。所以我有几个功能。

我的问题是我不知道如何在 PythonOperator 中参数化这些函数,以便它从另一个函数接收值

def read_file():
  df = pd.read_excel('EXCEL.xlsx')
  return df

def remove_NULL(df):
  df.dropna(inplace = True)
  return df

.......

with DAG('Example', start_date=datetime(AAAA, MM, DD), schedule_interval='@daily', catchup=False) as dag:

  step1 = PythonOperator(task_id = 'read', python_callable = read_file)

  step2 = PythonOperator(task_id = 'drop', python_callable = remove_NULL)


  step1 >> step2

第一步工作正常,第二步不是因为我不知道如何调用返回的值

标签: pythonpandasworkflowairflowpipeline

解决方案


请参阅以下链接以接收来自另一个函数的值

https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py

推荐阅读