spring-boot - 带有阻塞队列的 Spring Boot @kafkaListner
问题描述
我是新手Spring Boot @kafkaListener
。我的应用程序每秒收到近 20 万条关于主题的消息。我想将消息侦听器和消息处理分开。我该如何java.util.concurrent.BlockingQueue
使用@kafkaListener
?我可以使用它来使用它CompletableFuture
吗?
任何示例代码都会有更多帮助。
解决方案
我相信你想让你的消费者使用流水线实现。在像您这样的场景中实现这一点并不少见。为什么?好吧,如果KafkaConsumer
不考虑进行处理所需的时间,解压缩/反序列化可能会很耗时。由于这些操作堆叠在一个线程后面,因此最好将轮询与处理分开,这是通过几个缓冲区实现的。
一种方法是:您EventReceiver
为轮询启动一个线程。该线程会做与你经常做的事情相同的事情,但不是为每个事件触发侦听器,而是将事件传递给一个receivedEvents
缓冲区,该缓冲区可能是BlockingQueue<RecieveEvent>
. 因此,在 for 循环中,您将每条记录传递给阻塞队列。一旦 for 循环结束,该线程将利用另一个缓冲区,例如Queue<Map<TopicPartition, OffsetAndMetadata>>
-- 并且它将提交 processingThread 已成功处理的偏移量。
接下来,您EventReceiver
启动另一个线程 - processingThread。这将处理从缓冲区中提取记录,向该接收器的所有侦听器触发事件,然后更新队列状态以供 pollingThread 提交。
为什么 processingThread 不只是提交事件而不是将其传递回 pollingThread?这是 bcKafkaConsumer
要求调用的同一线程.poll()
应该是调用的线程,consumer.commitAsync(...)
否则您将获得并发异常。
此方法不适用于启用自动提交。
至于如何使用 Spring Kafka 做到这一点,我并不完全确定。但是,我确实知道 Spring Kafka 将低级 kafka 工作与业务逻辑分开EventReceiver
。EventListener (@KafkaListener)
理论上,你必须调整它们的实现,但我认为在没有 Spring Kafka 库的情况下实现这个会更容易。
推荐阅读
- excel - 根据公式打开 Excel 工作簿
- excel - 如何将“If and vlookup”组合成1个公式
- node.js - 使用 typescript 编译问题 node.js,类型不匹配。heartBeatIntervalId 是数字类型,但 clearInterval 需要节点超时类型
- react-native - 反应原生区间,无法正确处理
- php - 代码点火器 URL 更改
- javascript - 让相机在 Aframe 中的路径上设置动画以保持专注于主题
- extjs - 在 extjs 的网格上添加启用和禁用作为上下文菜单
- docker - Docker 守护进程已经运行但仍然得到:无法连接到 unix:///var/run/docker.sock 上的 Docker 守护进程。docker 守护进程是否正在运行?
- java - 如何在 jython 中使用 cPickle 阅读
- sql - 带通配符的 BIGQUERY CASE 语句