java - 在java中实现微批处理
问题描述
我正在开发一个基于 kafka 的应用程序,其中 kafka 侦听器将侦听记录;一旦 kafka 收到记录,我可能需要将记录写入文件。这里要将记录写入文件,我们要使用带有批处理大小和超时设置的微批处理。例如,batchsize 为 10,超时设置为 1000 ms,这意味着在写入文件之前等待 10 条记录,等待时间为 1000 毫秒。如果在任何情况下 Kafka 在 1000 毫秒内仅收到 5 条记录,则在该批次中只写入 5 条记录。
我在 Java 中做到这一点的效率有多高。
解决方案
在这种情况下,一种常见的方法是将所有记录放入队列中。当您的队列达到 10 或 1000 毫秒后,有一个线程将记录这些记录,具体取决于先出现的内容。
消费者代码:
CountDownLatch countDownLatch = new CountDownLatch(10);
countDownLatch.await(1000, TimeUnit.MILLISECONDS);
int queueSize = queue.size();
for(int i = 0; i < queueSize; ++i) {
... do your work here or put in a batch a do it right after loop
}
生产者代码:
Record record = ...receive new record...
queue.put(record);
consumer.getCountDownLatch().countDown();
作为队列,我建议使用未绑定的队列,例如LinkedTransferQueue
,因为您不想在达到 10 个任务时停止生产者,您仍然需要使用来自 kafka 的结果。
另一种选择是反应流。
推荐阅读
- c - cmake头文件包含来自静态库
- java - 在 java 中测试 zip 文件的上传会引发 EOFException
- ldap - openldap:使用 ldapadd 添加用户失败
- android - Android Studio如何正确地将listview项目插入sqlite数据库
- vue.js - Vuetify Autocomplete, item-slot , 保持字符高亮
- python - 在 APScheduler 中超出计划的手动作业执行
- python - SQLite 按行/列获取项目
- android - WhatsApp 网页参数
- javascript - 如何显示两个日期的所有星期(不是数字)
- android - 无论设备方向如何,都修复设备坐标系?