首页 > 解决方案 > 带有阻塞队列的 Spring Boot @kafkaListner

问题描述

我是新手Spring Boot @kafkaListener。我的应用程序每秒收到近 20 万条关于主题的消息。我想将消息侦听器和消息处理分开。我该如何java.util.concurrent.BlockingQueue使用@kafkaListener?我可以使用它来使用它CompletableFuture吗?

任何示例代码都会有更多帮助。

标签: spring-bootjava-8spring-kafka

解决方案


我相信你想让你的消费者使用流水线实现。在像您这样的场景中实现这一点并不少见。为什么?好吧,如果KafkaConsumer不考虑进行处理所需的时间,解压缩/反序列化可能会很耗时。由于这些操作堆叠在一个线程后面,因此最好将轮询与处理分开,这是通过几个缓冲区实现的。

一种方法是:您EventReceiver为轮询启动一个线程。该线程会做与你经常做的事情相同的事情,但不是为每个事件触发侦听器,而是将事件传递给一个receivedEvents缓冲区,该缓冲区可能是BlockingQueue<RecieveEvent>. 因此,在 for 循环中,您将每条记录传递给阻塞队列。一旦 for 循环结束,该线程将利用另一个缓冲区,例如Queue<Map<TopicPartition, OffsetAndMetadata>>-- 并且它将提交 processingThread 已成功处理的偏移量。

接下来,您EventReceiver启动另一个线程 - processingThread。这将处理从缓冲区中提取记录,向该接收器的所有侦听器触发事件​​,然后更新队列状态以供 pollingThread 提交。

为什么 processingThread 不只是提交事件而不是将其传递回 pollingThread?这是 bcKafkaConsumer要求调用的同一线程.poll()应该是调用的线程,consumer.commitAsync(...)否则您将获得并发异常。

此方法不适用于启用自动提交。

至于如何使用 Spring Kafka 做到这一点,我并不完全确定。但是,我确实知道 Spring Kafka 将低级 kafka 工作与业务逻辑分开EventReceiverEventListener (@KafkaListener)理论上,你必须调整它们的实现,但我认为在没有 Spring Kafka 库的情况下实现这个会更容易。


推荐阅读