首页 > 解决方案 > 如何在流集中执行弹性搜索查找

问题描述

我在 Streamsets v3.21 中接受两种记录 A 和 B - 在父类型 A 和多个子类型 B 之间有一个称为相关ID 的公共字段。类型 A 总是先到达。类型 A 和类型 B 被写入以将同一集群上的弹性搜索索引与同一管道分开。A型和B型的发送和组合不在我的控制范围内。它们由 Logstash 7.81 通过过滤器组进行预处理,我可以向其中添加新文件,但不能更改现有文件。

我需要将类型 A 上的字段 X 放入写入弹性搜索的类型 B 记录中。有谁知道当他们到达时通过查找类型 A 使弹性搜索更新类型 B 的方法?或者,任何人都可以告诉我在写入 B 类型并将值 X 应用于 B 类型记录之前在弹性搜索(来自流集)上查找类型 A 的方法吗?_Alternatively_我考虑过使用一个名为correlationid 且值为X 的环境变量,以便我可以查找它,但我担心会炸毁堆,因为我永远不知道何时删除env var,因为最多可以有N 个类型B记录
或者也许logstash可以以某种方式缓存correlationid和X的值;有一个名为“环境”的过滤器,它允许我为 A 类型存储 env_vars 并将它们应用于 B 类型,但我找不到定期清除它的方法

标签: linuxdockerelasticsearchlogstashstreamsets

解决方案


如何使用jython 评估器和“状态”对象。您可以(小心地)将状态对象用于缓存,并在发送到弹性之前将字段添加到记录中。


推荐阅读