首页 > 解决方案 > 气流 dag 中特定任务的执行时间和状态

问题描述

我想提取 Airflow Dag 中特定任务的所有执行时间。我宁愿通过写另一个 Dag 来做到这一点。

我已经使用下面的 Dag 来提取另一个 Dag 的状态和执行时间


import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery

dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
        
# Declare empty array

arr = []
arr1 = []

for dag_run in dag_runs:
    arr.append(dag_run.state)
    arr1.append(dag_run.execution_date)
 

dag_info = {'time': arr1, 'dag_status': arr}

df = pd.DataFrame(dag_info)

## Keep failed and successful dag runs    
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")] 

df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1

### Code to save the table in Bigquery


return None

我也想做同样的事情,但这次是为“my_dag”提取任务信息。我已经尝试了给定dag 中 Airflow 任务状态的解决方案,但它返回“无”,尽管我知道任务和 dag 正在运行。

def task_status_check(**kwargs):

        ##### TESTING. ####

        import pandas as pd
        import datetime

        my_date = datetime.datetime(2020, 9, 28)

        my_dag_id = 'my_dag'
        my_task_id = 'my_task'


        dag_folder = conf.get('core','DAGS_FOLDER')
        dagbag = DagBag(dag_folder)
        check_dag = dagbag.dags[my_dag_id]
        my_task = check_dag.get_task(my_task_id)

        for n in range(1, 500, 2):

            time_delta = timedelta(minutes = n)
            my_date_1 = my_date + time_delta 
            ti = TaskInstance(my_task, my_date_1)

            print("######################")
            print(ti.current_state())

任何帮助将不胜感激。

谢谢

标签: airflowgoogle-cloud-composer

解决方案


我怀疑TaskInstance()模型中的问题,而不是task_status_check()函数中包含的自定义代码逻辑。基本上TaskInstance()类提供了各种 Airflow 任务管理利用SQLAlchemy OMR Python 工具的功能,该工具对整个 Airflow 元数据数据库执行查询,从task_instanceSQL 表中获取记录,查看源代码,您可能会得到反映这一点的#L203 。

我已经在非常常见的类似场景中尝试了您的代码,并面临相同的None返回状态。回顾最初的问题 Stack线程中提到的用户的努力,并深入了解我已调整get_task_instance()以检查行为的问题,并指向此函数以提取特定 Airflow 任务的状态。只要get_task_instance()是实验,貌似就是调用TaskInstance()类发现任务状态:

def task_check(**kwargs):
  import datetime
  from datetime import timezone
  from airflow import configuration as conf
  import logging
  from airflow.api.common.experimental.get_task_instance import get_task_instance
  
  my_date = datetime.datetime('yyyy', 'mm', 'dd', 'hour', 'min', 'sec')
  my_date = my_date.replace(tzinfo=timezone.utc) 

  my_dag_id = "Dag_id"
  my_task_id = "Task_id"
  ti = get_task_instance(my_dag_id,my_task_id,my_date)

我检查了对 Airflow DB 的请求是否成功,但是get_task_instance函数返回相同的None状态:

{python_operator.py:114} 信息 - 完成。返回值为:无

同时,做进一步的研究,考虑为 Airflow 任务提取状态的其他方法,他们只是保持这项工作正常。

  • Airflow命令行执行器,调整为在 Composer 工作人员之一上运行:

    kubectl -it exec $(kubectl get po -l run=airflow-worker -o jsonpath='{.items[0].metadata.name}' \
        -n $(kubectl get ns| grep composer*| awk '{print $1}')) -n $(kubectl get ns| grep composer*| awk '{print $1}') \
        -c airflow-worker airflow task_state <Dag_ID> <Task_ID> 2020-09-27T23:59:21+00:00
    
  • 相应地查询元数据 MySQL task_instance

   SELECT task_id, state, execution_date
   FROM task_instance
   WHERE dag_id = 'dag_id'
   AND DATE(execution_date) = 'execution_date'
   AND task_id = 'task_id'

推荐阅读