apache-nifi - 尝试使用 NifI 添加基于条件的字段
问题描述
我是 Apache NiFi 的新手,目前使用它将消息数据路由到各个位置。我希望根据一组条件添加一些字段。
目前我有一个读取日志文件的 GetFile 处理器 ---> ExtractGrok 应用 grok 模式进行解析 ---> ConvertRecord 从 Grok 转换为 Json。下一部分是我难过/不知道下一步该做什么。
在我的 json 中,我有一个字段refresh_time
,我需要根据该字段的一些条件创建 2 个新字段refresh_time
类似的东西if refresh_time < 10 then cache = 1; else if refresh_time > 10 then reprocess = 1
这里的最终目标是数字字段cache
,refresh_time
可以在聚合中使用。
根据条件添加 2 个数字字段的最佳方法是什么。是否有用于添加额外字段或更新记录以包含新字段的处理器?
谢谢。
解决方案
有几种方法可以实现你想要的。
一个选项(更具可读性)
QueryRecord可以让您跨记录编写 SQL 语句,并让您按结果拆分它们。例如
cache
添加一个用 value调用的动态属性SELECT * FROM FLOWFILE WHERE refresh_time < 10
。
refresh
添加一个用 value调用的动态属性SELECT * FROM FLOWFILE WHERE refresh_time > 10
。
QueryRecord 现在将具有关系failure
、original
和。cache
refresh
每个分支cache
都refresh
将是一个UpdateRecord,Replacement Value Strategy
设置为Literal Value
.
对于关系,您可以添加一个名为valuecache
的新动态属性。对于关系,您可以添加一个名为value的新动态属性。cache
1
refresh
refresh
1
类似选项(可能性能更高)
如果您想避免额外的 UpdateRecord,可以在 QueryRecord 中添加字段,如下所示:
两个动态属性设置为:
cache
=SELECT *, 1 AS cache FROM FLOWFILE WHERE REFRESH < 10
reprocess
=SELECT *, 1 AS reprocess FROM FLOWFILE WHERE REFRESH > 10
由于磁盘读取次数较少,此选项的性能可能更高。
这个要点是第二个选项的一个例子,你可以将它导入到 NiFi中进行尝试。
此外,仅供参考,您可以在 ConvertRecord 中使用GrokReader将 Grok 直接解析为 JSON,从而可能跳过 ExtractGrok。
推荐阅读
- azure-webjobs - Azure 网络作业 ETIMEDOUT
- r - 从函数中的数据框中选择和绘制可选列,R
- typescript - StencilJS 中基于 Prop 值的动态事件名称
- dart - 在有限的时间内调用 Future 方法
- algorithm - 如何阻止我的迷宫算法在唯一路径前制作墙壁?
- sql - SQL 注入 - Web 服务器上的全局过滤器
- sql-server - SQL Server 中两个值的除法。我得到一个奇怪的结果
- javascript - 当我们在同一页面上使用 jQuery 步骤时,日期时间选择器不起作用
- javascript - URL 哈希:目标在 ajax 元素上
- c# - 防止从外部类 c# 访问列表元素