首页 > 解决方案 > 带有自定义字段的气流自定义指标和/或结果对象

问题描述

通过 Airflow 运行 pySpark SQL 管道时,我有兴趣获取一些业务统计信息,例如:

一种想法是将其直接推送到指标中,这样它就会被 Prometheus 等监控工具自动使用。另一个想法是通过一些 DAG 结果对象获取这些值,但我无法在文档中找到任何关于它的信息。

如果您有解决方案,请至少发布一些伪代码。

标签: airflowairflow-scheduler

解决方案


我希望在airflow.stats.Stats课堂上重用 Airflow 的统计数据和监控支持。也许是这样的:

import logging
from airflow.stats import Stats

PYSPARK_LOG_PREFIX = "airflow_pyspark"


def your_python_operator(**context):
    [...]

    try:
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
        Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
        # So on and so forth
    except:
        logging.exception("Caught exception during statistics logging")

    [...]

推荐阅读