首页 > 解决方案 > Logstash - 将数据保存在内存中的输入文件插件

问题描述

我有 1- 一个 CSV 文件和 2- 一个实时 KAFKA 流。KAFKA 流引入了实时流日志,CSV 文件包含元数据记录,我需要在将它们发送到 Elastic Search 之前将它们与流日志连接起来。

Kafka 流日志和 CSV 记录的示例:

KAFKA log: MachineID: 2424, MachineType: 1, MessageType: 9
CSV record: MachineID: 2424, MachineOwner: JohnDuo

记录我需要在发送到 ES 之前在 logstash 中构建:

MachineID: 2424
MachineOwner: JohnDuo
MachineType: 1
MessageType: 9

我想要一个 Ruby 或 Logstash 插件或其他任何解决方案来读取此 CSV 文件一次并将它们带入并将它们加入 Logstash conf 文件。我需要将 CSV 文件的内容保存在内存中,否则对每个实时 Kafka 日志的 CSV 查找会破坏我的 Logstash 性能。

标签: rubyelasticsearchjoinapache-kafkalogstash

解决方案


试试translate过滤器。

你需要这样的东西。

filter {
    translate {
        dictionary_path => "/path/to/your/csv/file.csv"
        field => "[MachineId]"
        destination => "[MachineOwner]"
        fallback => "not found"
    }
}

然后,您file.csv将拥有以下内容。

2424,JohnDuo
2425,AnotherUser

对于每个具有该字段的事件MachineId,此过滤器将在字典中查找此 id,如果找到匹配项,它将创建一个以MachineOwner匹配值命名的字段,如果没有找到匹配项,它将创建MachineOwner值为的字段not found,如果您不想在不匹配的情况下创建该字段,您可以删除该fallback选项。

字典在 logstash 启动时加载到内存中,并且每 300 秒重新加载一次,您也可以更改该行为。


推荐阅读