java - NiFi 自定义处理器 - 读取数据库视图
问题描述
我是 NiFi 的新手,我正在开发一个自定义处理器来从 psql 数据库视图中提取最新数据。当自定义处理器初始化时,我可以使用下面的代码检索数据库视图。
private void GetData(){
Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
while(rs.next()){
//Get data from database
}
connection.close();
}
但是,我正在努力从数据库视图中获取最近的更新。主要问题是何时将新条目添加到数据库中。由于在初始化处理器时会查询数据库,因此自定义处理器不会有新条目。
我试图在 public void onTrigger() 函数中实现查询;但是,这将导致管道备份,因为它将查询每个流文件的数据库(如果每秒有数千个流文件进入,这并不理想)。
有没有在处理器启动时查询数据库的方法?无需在每个流文件上查询数据库?或者,是否可以检测数据库是否已被修改并在修改时提取数据?或者甚至设置一个计时器来拉取自定义处理器中的数据库?
非常感谢您的任何帮助,在此先感谢您。
解决方案
我认为,如果您可以对更高级别的用例进行更多解释,它可能会帮助您获得解决方案,因为这似乎是一种不常见的方法。通常每个处理器都有一个单一的职责,因此一些处理器与数据库交互,然后输出必要的信息供其他处理器使用。
有一些LookupService
s 可能是值得研究的好例子,例如MongoDBLookupService
.
如果您的用例实际上是“我有一个自定义处理器,它摄取包含任意数据的流文件,并且需要使用此数据库表中的最新数据对它们执行一些操作”,您有几个选择:
- 以上述方法执行数据库查询,并在此期间调用该方法一次
onEnabled()
以从表中获取大部分数据,然后使用线程定期调用它以保持更新并将结果本地存储在字段中。当onTrigger()
方法运行时,使用本地缓存结果而不是进行数据库调用。这将减少延迟并为您提供近乎实时的数据。@OnStopped
请务必通过带有注释的方法清理线程运行器和本地状态。 onTrigger()
执行与流文件处理(即)内联的数据库查询。这可能导致高延迟和吞吐量阻塞。如果能够通过 using 对流文件进行批处理,则可以潜在地增加每个执行周期中处理的流文件List<FlowFile> flowfiles = session.get(1000);
的数量(数量是可配置的)。- 如果没有 upserts/in-place 修改(即对数据库表的任何更改将导致新行),您可以使用哨兵查询(
SELECT COUNT(*) FROM table;
)返回行数,将其与先前返回的行数进行比较,如果这些数字不同,则仅执行检索所有数据的“昂贵”查询。在这种情况下,您可以通过记录先前获得的行的最大 ID 或时间戳来仅检索增量行。如果 upserts 是可能的,类似的东西SELECT MAX(lastModified) AS mostRecentTimeModified FROM table;
可能会有所帮助。
推荐阅读
- influxdb - InfluxDB:从零开始cumulative_sum()/对cumulative_sum和non_negative_difference所需的聚合分组
- jquery - Asp.net Mvc ajax 动作提交两次
- android - 改造 json 数据传递
- c# - textarea asp-for 不显示属性
- ios - 仅适合标签的高度
- mysql - 在一组天(在特定日期和前 2 天)获取不同的 id 计数
- wordpress - 古腾堡附加块属性
- sql - SQL UPDATE用空字符串替换括号内的所有文本
- arrays - Volley Json 解析
- cuda - cudaMemPrefetchAsync 上的设备序号无效