airflow - 气流 dag 中特定任务的执行时间和状态
问题描述
我想提取 Airflow Dag 中特定任务的所有执行时间。我宁愿通过写另一个 Dag 来做到这一点。
我已经使用下面的 Dag 来提取另一个 Dag 的状态和执行时间
import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery
dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
# Declare empty array
arr = []
arr1 = []
for dag_run in dag_runs:
arr.append(dag_run.state)
arr1.append(dag_run.execution_date)
dag_info = {'time': arr1, 'dag_status': arr}
df = pd.DataFrame(dag_info)
## Keep failed and successful dag runs
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")]
df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1
### Code to save the table in Bigquery
return None
我也想做同样的事情,但这次是为“my_dag”提取任务信息。我已经尝试了给定dag 中 Airflow 任务状态的解决方案,但它返回“无”,尽管我知道任务和 dag 正在运行。
def task_status_check(**kwargs):
##### TESTING. ####
import pandas as pd
import datetime
my_date = datetime.datetime(2020, 9, 28)
my_dag_id = 'my_dag'
my_task_id = 'my_task'
dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[my_dag_id]
my_task = check_dag.get_task(my_task_id)
for n in range(1, 500, 2):
time_delta = timedelta(minutes = n)
my_date_1 = my_date + time_delta
ti = TaskInstance(my_task, my_date_1)
print("######################")
print(ti.current_state())
任何帮助将不胜感激。
谢谢
解决方案
我怀疑TaskInstance()模型中的问题,而不是task_status_check()
函数中包含的自定义代码逻辑。基本上TaskInstance()
类提供了各种 Airflow 任务管理利用SQLAlchemy OMR Python 工具的功能,该工具对整个 Airflow 元数据数据库执行查询,从task_instance
SQL 表中获取记录,查看源代码,您可能会得到反映这一点的#L203 。
我已经在非常常见的类似场景中尝试了您的代码,并面临相同的None
返回状态。回顾最初的问题 Stack线程中提到的用户的努力,并深入了解我已调整get_task_instance()
以检查行为的问题,并指向此函数以提取特定 Airflow 任务的状态。只要get_task_instance()
是实验包,貌似就是调用TaskInstance()
类发现任务状态:
def task_check(**kwargs):
import datetime
from datetime import timezone
from airflow import configuration as conf
import logging
from airflow.api.common.experimental.get_task_instance import get_task_instance
my_date = datetime.datetime('yyyy', 'mm', 'dd', 'hour', 'min', 'sec')
my_date = my_date.replace(tzinfo=timezone.utc)
my_dag_id = "Dag_id"
my_task_id = "Task_id"
ti = get_task_instance(my_dag_id,my_task_id,my_date)
我检查了对 Airflow DB 的请求是否成功,但是get_task_instance
函数返回相同的None
状态:
{python_operator.py:114} 信息 - 完成。返回值为:无
同时,做进一步的研究,考虑为 Airflow 任务提取状态的其他方法,他们只是保持这项工作正常。
Airflow命令行执行器,调整为在 Composer 工作人员之一上运行:
kubectl -it exec $(kubectl get po -l run=airflow-worker -o jsonpath='{.items[0].metadata.name}' \ -n $(kubectl get ns| grep composer*| awk '{print $1}')) -n $(kubectl get ns| grep composer*| awk '{print $1}') \ -c airflow-worker airflow task_state <Dag_ID> <Task_ID> 2020-09-27T23:59:21+00:00
相应地查询元数据 MySQL
task_instance
:
SELECT task_id, state, execution_date
FROM task_instance
WHERE dag_id = 'dag_id'
AND DATE(execution_date) = 'execution_date'
AND task_id = 'task_id'
推荐阅读
- javascript - 如何使用 for 循环为音频文件创建动态 URL?Javascript
- list - Flutter 按值排序列表
- c# - 需要通过与“dataAfter”比较来过滤“dataBefore”的数据
- javascript - 如何在 PHP 后获取动态未选中复选框的值
- django - 如何将 Token 添加到扩展 Django Rest Framework 中的用户模型的模型中?
- sql - MS Access Left Join 的错误行为
- javascript - 更改路由Vuejs时的removeEventListener
- docker - 将硬盘挂载到 docker 容器
- javascript - 将重复数组分组到 td 并在表中创建新 tr
- web - Apple HTML 幻灯片,缺少 bevel_top.giv 和 bevel_l.gif