apache - 如何从 Nifi 中的单个 CSV 文件中路由/提取不同的列?
问题描述
我有一个 GetFile 处理器,它获取一个包含大约 100 列的大型 CSV 文件。我的目标是从这个 CSV 文件中提取特定的列子集,并将它们发送到 MySQL 中的各种不同的表。
当前方法使用GetFile -> Multiple ConvertRecord Processors,其中定义了不同的CSVReader和CSVRecordSetWriters,它们基于 SQL 表的模式遵守 AVRO 模式。
有没有办法只有一个 GetFile 并将子集路由到不同的处理器,而不是跨多个流复制大型 CSV 文件,然后由不同的 ConvertRecord 处理器拾取?
可以看出,CSV 文件跨多个路径复制,使事情变得非常低效。对于此示例,大小为 57 字节,但通常我会在 60-70 个此类ConvertRecord路径中获得 ~6 GB 文件
如果我知道需要从 CSV 文件中提取哪些列子集并将其发送到不同的表,如何有效地路由我的数据?例子:
A、B列转到一张表
A、C列转到第二个表
A、D、E列转到第三个表
A、D、F、G列转到第三张表....
解决方案
如果您使用 PutDatabaseRecord,那么您可以拥有多个 PutDatabaseRecord 处理器,每个处理器使用不同的读取模式来选择适当的列,类似于您使用 ConvertRecord 处理器所做的事情,但您实际上不需要写出转换后的数据。
此外,将同一个流文件分叉到 6 个不同的位置并没有什么低效的地方。在上面的示例中,如果 GetFile 选取一个 6GB 文件,则内容存储库中只有 1 个该 6GB 内容的副本,并且将有 3 个流文件指向相同的内容,因此每个 ConvertRecord 将读取相同的 6GB 内容。然后他们每个人都会写出一个新的内容,这将是数据的一个子集,并且在没有流文件引用它时,原始的 6GB 将从内容存储库中删除。因此,并非来自 GetFile 的每个附加连接都在制作 6GB 的副本。
推荐阅读
- c# - IIS Express over VPN 中的 Windows 身份验证
- terminal - 在当前目录中打开终端
- vue.js - 具有相同功能的相同组件列表 - 代码膨胀?
- git - 在我的存储库中使用另一个存储库的最佳方式是什么?
- spacy - 是否可以使用自定义命名实体改善 spaCy 的相似性结果?
- sql - 使用 from/to 表查询
- neo4j-apoc - 为什么使用 apoc 而不是 cypher 进行简单查询?
- angular - 如何在 Angular 的一个组件中处理多个查询路由
- javascript - 如何在 Node.js 中创建可取消的轮询功能?
- angular - 循环遍历数据数组并将 from 控件名称提供给输入字段无法正常工作?