首页 > 解决方案 > 尝试使用 NifI 添加基于条件的字段

问题描述

我是 Apache NiFi 的新手,目前使用它将消息数据路由到各个位置。我希望根据一组条件添加一些字段。

目前我有一个读取日志文件的 GetFile 处理器 ---> ExtractGrok 应用 grok 模式进行解析 ---> ConvertRecord 从 Grok 转换为 Json。下一部分是我难过/不知道下一步该做什么。

在我的 json 中,我有一个字段refresh_time ,我需要根据该字段的一些条件创建 2 个新字段refresh_time

类似的东西if refresh_time < 10 then cache = 1; else if refresh_time > 10 then reprocess = 1

这里的最终目标是数字字段cacherefresh_time可以在聚合中使用。

根据条件添加 2 个数字字段的最佳方法是什么。是否有用于添加额外字段或更新记录以包含新字段的处理器?

谢谢。

标签: apache-nifi

解决方案


有几种方法可以实现你想要的。

一个选项(更具可读性)

QueryRecord可以让您跨记录编写 SQL 语句,并让您按结果拆分它们。例如

cache添加一个用 value调用的动态属性SELECT * FROM FLOWFILE WHERE refresh_time < 10

refresh添加一个用 value调用的动态属性SELECT * FROM FLOWFILE WHERE refresh_time > 10

QueryRecord 现在将具有关系failureoriginal和。cacherefresh

每个分支cacherefresh将是一个UpdateRecordReplacement Value Strategy设置为Literal Value.

对于关系,您可以添加一个名为valuecache的新动态属性。对于关系,您可以添加一个名为value的新动态属性。cache1refreshrefresh1

类似选项(可能性能更高)

如果您想避免额外的 UpdateRecord,可以在 QueryRecord 中添加字段,如下所示:

两个动态属性设置为:

cache=SELECT *, 1 AS cache FROM FLOWFILE WHERE REFRESH < 10

reprocess=SELECT *, 1 AS reprocess FROM FLOWFILE WHERE REFRESH > 10

由于磁盘读取次数较少,此选项的性能可能更高。

这个要点是第二个选项的一个例子,你可以将它导入到 NiFi中进行尝试。

此外,仅供参考,您可以在 ConvertRecord 中使用GrokReader将 Grok 直接解析为 JSON,从而可能跳过 ExtractGrok。


推荐阅读