首页 > 解决方案 > 如何从 Nifi 中的单个 CSV 文件中路由/提取不同的列?

问题描述

我有一个 GetFile 处理器,它获取一个包含大约 100 列的大型 CSV 文件。我的目标是从这个 CSV 文件中提取特定的列子集,并将它们发送到 MySQL 中的各种不同的表。

当前方法使用GetFile -> Multiple ConvertRecord Processors,其中定义了不同的CSVReaderCSVRecordSetWriters,它们基于 SQL 表的模式遵守 AVRO 模式。

有没有办法只有一个 GetFile 并将子集路由到不同的处理器,而不是跨多个流复制大型 CSV 文件,然后由不同的 ConvertRecord 处理器拾取?

这是我现在的流量, 大文件的多个队列

可以看出,CSV 文件跨多个路径复制,使事情变得非常低效。对于此示例,大小为 57 字节,但通常我会在 60-70 个此类ConvertRecord路径中获得 ~6 GB 文件

如果我知道需要从 CSV 文件中提取哪些列子集并将其发送到不同的表,如何有效地路由我的数据?例子:

A、B列转到一张表

A、C列转到第二个表

A、D、E列转到第三个表

A、D、F、G列转到第三张表....

标签: apachecsvapache-nifi

解决方案


如果您使用 PutDatabaseRecord,那么您可以拥有多个 PutDatabaseRecord 处理器,每个处理器使用不同的读取模式来选择适当的列,类似于您使用 ConvertRecord 处理器所做的事情,但您实际上不需要写出转换后的数据。

此外,将同一个流文件分叉到 6 个不同的位置并没有什么低效的地方。在上面的示例中,如果 GetFile 选取一个 6GB 文件,则内容存储库中只有 1 个该 6GB 内容的副本,并且将有 3 个流文件指向相同的内容,因此每个 ConvertRecord 将读取相同的 6GB 内容。然后他们每个人都会写出一个新的内容,这将是数据的一个子集,并且在没有流文件引用它时,原始的 6GB 将从内容存储库中删除。因此,并非来自 GetFile 的每个附加连接都在制作 6GB 的副本。


推荐阅读