首页 > 解决方案 > 在java中实现微批处理

问题描述

我正在开发一个基于 kafka 的应用程序,其中 kafka 侦听器将侦听记录;一旦 kafka 收到记录,我可能需要将记录写入文件。这里要将记录写入文件,我们要使用带有批处理大小和超时设置的微批处理。例如,batchsize 为 10,超时设置为 1000 ms,这意味着在写入文件之前等待 10 条记录,等待时间为 1000 毫秒。如果在任何情况下 Kafka 在 1000 毫秒内仅收到 5 条记录,则在该批次中只写入 5 条记录。

我在 Java 中做到这一点的效率有多高。

标签: javaapache-kafkakafka-consumer-api

解决方案


在这种情况下,一种常见的方法是将所有记录放入队列中。当您的队列达到 10 或 1000 毫秒后,有一个线程将记录这些记录,具体取决于先出现的内容。

消费者代码:

 CountDownLatch countDownLatch = new CountDownLatch(10);
 countDownLatch.await(1000, TimeUnit.MILLISECONDS);
 int queueSize = queue.size();
 for(int i = 0; i < queueSize; ++i) {
     ... do your work here or put in a batch a do it right after loop
 }

生产者代码:

 Record record = ...receive new record...
 queue.put(record);
 consumer.getCountDownLatch().countDown();

作为队列,我建议使用未绑定的队列,例如LinkedTransferQueue,因为您不想在达到 10 个任务时停止生产者,您仍然需要使用来自 kafka 的结果。

另一种选择是反应流


推荐阅读