首页 > 解决方案 > 获取 Nifi PutSQL 处理器的最后执行时间戳

问题描述

有没有办法PutSQL通过 REST-API 获取处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我可以以某种方式自己构建一个?

设置:我有 Airflow 来触发我的 Nifi-ETL,它以几个 PutSQL 处理器结束 - 完成之后,我需要在 Airflow 中执行其他操作。

想法:我想触发第一个 Nifi 处理器,然后在 Airflow 中等待,直到last_execution_timestamp最后一个 PutSQL 处理器的更新。

问题: 我尝试访问属性statsLastRefreshed,但这不是最后一次执行时间,而是最后一次任何东西(用户/api-requests)访问导致Nifi刷新处理器的处理器。

s = processor["status"]["statsLastRefreshed"]  # '13:13:26 CEST'

我在 Airflow 的 REST API 文档中找不到任何内容。

我看到的唯一其他选择是从 Airflow 向最后一个 PutSQL 处理器的数据库表发出请求,以查看那里是否发生了任何新情况。

标签: pythonapache-nifi

解决方案


我想出了一个变通的解决方案。

  1. 在处理器中添加一个名为mypropertyname的自定义属性,其值为${now()}

  2. 通过处理器的任何流文件都将具有通过处理器时的时间戳作为属性!

  3. UpdateAttribute在步骤 1 中的处理器之后有一个处理器,并将选项(在处理器属性下)存储状态设置为Store state locally

  4. UpdateAttribute在处理器中添加一个名为readable_property的自定义属性并将其设置为 value ${'mypropertyname'}

处理器的状态现在包含最后一个流文件的值(例如,带有now()从步骤 1 开始执行方法的时间戳)。

  1. /nifi-api/processors/{id}/state 通过 REST-API 和 URI 上的 GET (例如在 Airflow 中)获取有状态处理器的值(以及因此通过 (!) 的最后一个流文件的值)

返回的 JSON 包含以下几行:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

然后,您只需解析 JSON 以获取 Airflow 中的值。

now注意:在前一个处理器将属性添加到流文件与流文件实际通过UpdateAttribute您可以读取时间戳的处理器之间会有轻微的延迟。


推荐阅读