python - 获取 Nifi PutSQL 处理器的最后执行时间戳
问题描述
有没有办法PutSQL
通过 REST-API 获取处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我可以以某种方式自己构建一个?
设置:我有 Airflow 来触发我的 Nifi-ETL,它以几个 PutSQL 处理器结束 - 完成之后,我需要在 Airflow 中执行其他操作。
想法:我想触发第一个 Nifi 处理器,然后在 Airflow 中等待,直到last_execution_timestamp
最后一个 PutSQL 处理器的更新。
问题:
我尝试访问属性statsLastRefreshed
,但这不是最后一次执行时间,而是最后一次任何东西(用户/api-requests)访问导致Nifi刷新处理器的处理器。
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
我在 Airflow 的 REST API 文档中找不到任何内容。
我看到的唯一其他选择是从 Airflow 向最后一个 PutSQL 处理器的数据库表发出请求,以查看那里是否发生了任何新情况。
解决方案
我想出了一个变通的解决方案。
在处理器中添加一个名为mypropertyname的自定义属性,其值为
${now()}
通过处理器的任何流文件都将具有通过处理器时的时间戳作为属性!
UpdateAttribute
在步骤 1 中的处理器之后有一个处理器,并将选项(在处理器属性下)存储状态设置为Store state locally
。UpdateAttribute
在处理器中添加一个名为readable_property的自定义属性并将其设置为 value${'mypropertyname'}
。
处理器的状态现在包含最后一个流文件的值(例如,带有now()
从步骤 1 开始执行方法的时间戳)。
/nifi-api/processors/{id}/state
通过 REST-API 和 URI 上的 GET (例如在 Airflow 中)获取有状态处理器的值(以及因此通过 (!) 的最后一个流文件的值)
返回的 JSON 包含以下几行:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
然后,您只需解析 JSON 以获取 Airflow 中的值。
now
注意:在前一个处理器将属性添加到流文件与流文件实际通过UpdateAttribute
您可以读取时间戳的处理器之间会有轻微的延迟。
推荐阅读
- android - Qt for android 链接器警告
- docker - “无法连接到 Docker 守护进程”
- django - 重命名用户模型
- excel - Excel数据分组
- c# - 如何正确实现处理 POST 请求、检索 URI 参数和请求有效负载的 .NET 控制器方法?
- javascript - 使用 Material UI 从 JSON 构建动态表单
- java - 使用 Prodgaurd 混淆 Spring Boot 应用程序的步骤
- javascript - 使用 moment 将当前时间转换为毫秒
- matlab - MATLAB - 读取多个 *.log 文件,计算并在新文件夹中保存为 .*txt
- c# - 我如何设置 Searchbar Inative 和用户单击此以打开对话框