首页 > 解决方案 > 使用 Nifi 调度批量数据插入 Hive

问题描述

我正在使用Nifi来管理我的数据流。我首先使用ConsumeKafka处理器接收数据,然后使用EvaluateJsonPathReplaceText处理器来获取此请求

insert into my_table values('x1','x2','x3');

最后,这个请求被送到PuthiveQl处理器。通过这种方式,我将数据逐行插入到我的 Hive 表中,工作正常,但这是一种非常糟糕的方法。

我只想知道如何获得这样的请求

insert into my_table values('x1','x2','x3'),('x11','x22','x33'),('x111','x222','x333');

通过插入一个; 到一天结束时的查询,因此每 24 小时获取一次批量数据插入到 hive 中。

请在 Nifi 中找到我的数据流。

在此处输入图像描述

标签: hivehiveqlapache-nifi

解决方案


我找到了解决方案,但仅限于第一部分。使用此数据流

在此处输入图像描述

我终于可以得到这样的查询

插入 my_table 值('x1','x2','x3'),('x11','x22','x33'),('x111','x222','x333');

我使用第一个 ReplaceText 处理器来获取最后用逗号插入的值 在此处输入图像描述

然后我合并内容(要设置的最大和最小条目数) 第二个 ReplaceText 处理器是在合并内容的开头 将句子insert 添加到 my_table 值中在此处输入图像描述

最后,我使用第三个 ReplaceText 处理器将最后一个替换为; 在查询结束时。 在此处输入图像描述

对于调度任务,我还没有找到解决方案。


推荐阅读