首页 > 解决方案 > 实现生产者和批量消费者模型的最简单java方法

问题描述

我需要生产者一个一个地生产数据,消费者将批量消费数据(除非队列有足够的元素,否则将阻塞)。但是java BlockingQueue似乎只支持Consumer一个一个消费数据。

卡夫卡似乎是一个解决方案,还有其他更简单的解决方案吗?

标签: javaapache-kafkaproducer-consumer

解决方案


如果您需要纯 Java 解决方案,我最近开发了这个实用程序,如果队列元素未达到批处理大小,则使用刷新超时批处理 BlockingQueue 元素。它还支持使用多个实例来处理同一组数据的扇出模式(使用更多的 cpu-core)。

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

更多信息在这里!

https://github.com/fulmicotone/io.fulmicotone.fqueue


推荐阅读