apache-nifi - 从平面文件中读取数据后,如何从 Nifi Flow File 中仅提取几列
问题描述
平面文件有以下数据,没有需要加载到 MySQL 表中的标题。
101,AAA,1000,10
102,BBB,5000,20
我使用 GetFile 或 GetSFTP 处理器来读取数据。读取数据后,流文件包含上述数据。我只想将第一列、第二列和第四列加载到 MySQL 表中。我期望在 MySQL 表中的输出如下。
101,AAA,10
102,BBB,20
您能帮我解决如何从 nifi 中的传入流文件中仅提取几列并将其加载到 MySQL 中吗?
解决方案
这只是一种方法,但还有其他几种方法。此方法使用记录,否则会避免修改基础数据 - 它只是忽略在insert
. 这在与更大的流集成时非常有用,其中数据由可能需要原始数据的其他处理器使用,或者您已经在使用记录。
假设您的表有列
id | name | value
你的数据看起来像
101,AAA,1000,10
102,BBB,5000,20
您可以使用带有and设置的PutDatabaseRecord
处理器并将 a 添加为.Unmatched Field Behavior
Unmatched Column Behavior
Ignore Unmatched...
CSVReader
Record Reader
在CSVReader
你可以设置Schema Access Strategy
为Use 'Schema Text' Property
. 然后将Schema Text
属性设置为以下内容:
{
"type": "record",
"namespace": "nifi",
"name": "db",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "ignoredField", "type": "string" },
{ "name": "value", "type": "string" }
]
}
这会将 NiFi Record 字段与 DB Table 列进行匹配,这将匹配字段 1,2 和 4,同时忽略字段 3(因为它不匹配列名)。
显然,修改Schema Text
架构中的字段名称以匹配数据库表的列名称。您还可以在此处进行数据类型检查/转换。
推荐阅读
- curl - Google 无法抓取,网站未更改。想法?
- xcode - 使用 xcrun simctl status_bar 在 iOS 模拟器上更改或删除运营商名称
- nlp - 有没有办法确定句子数据集中的词性模式?
- teradata - Teradata SQL - 小时到分钟数
- python - 如何在pyside2中关闭另一个窗口?
- python - 等价于 Python 的 Lua 的 pairs()
- karate - 如何传递 karate.prevRequest 和 response 是从一个功能文件到另一个功能文件的参数
- sql-server - T-SQL 游标 + 动态 SQL(引用特定主键的行数)
- python - Python & PostgreSql:将值插入表中的列
- testing - 如何停止 Trogdor 测试 Agent + Coordinator 服务负责关闭 Kafka 和 Zookeeper 实例