asynchronous - 如何创建异步 Camel-kafka 消费者?
问题描述
我有一条从卡夫卡到消费者的骆驼路线。如果传入的消息是 18000 TPS,它正在以 2000 的 TPS 消费和生产。所以消费者话题有消费者滞后。如果我保持 max.poll.recors = 500 我能够达到 2000 TPS。如果我保持生产者设置 requestRequiredAcks=0 我可以达到 4000 TPS。但仍然存在消费滞后。我们知道,当 from->to 完成时,骆驼路线就完成了。从 2 个消费者计数为 2 的分区消费的消费者一直很忙,直到路由完成。
有没有办法让骆驼卡夫卡消费者异步。任何代码示例?
from("kafka:{{consumer.topic}}?brokers={{kafka_dev.host}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount=2"
+ "&seekTo=latest" + "&groupId={{consumer.group}}" + "&keyDeserializer="
+ KEYDESERIALIZER + "&valueDeserializer=" + VALUEDESERIALIZER + SSL).doTry()
.routeId("route1")
.process(new CamelProcessor())
.to("kafka:{{producer.topic}}?brokers={{kafka_dev.host}}" +"&requestRequiredAcks=1" )
.doCatch(Exception.class));
我们还观察到,在此路由中引入线程会重新读取相同的已处理和发送的消息。这个链接是不是说骆驼卡夫卡https://stackoverflow.com/questions/56716812/how-to-commit-offsets-thread-safe-using-camel-kafka
解决方案
推荐阅读
- artificial-intelligence - 蒙特卡洛树搜索 - 处理游戏结束节点
- tcp - 如果我在以太网 II、巨型帧和 DIX 的数据中间和数据包末尾看到以太网 FCS,该怎么办
- rest - CodenameOne:使用 REST PATCH 调用的问题
- java - 如何在 JUnit 测试用例中使用 BeanPostProcessor
- python-3.x - 在 Windows 10 64 位版本中找不到名为 tensorflow 的模块
- android - 手机喜欢(oppo)fcm通知声音来但通知不可见
- c# - Application Insights 不包括由自定义 ITelemetryInitializer 设置的属性
- extjs - 访问 Ext.grid.Panel 的列 ComboBox Selection 上的底层数据行
- python - 使用多个 JPEG 文件的 HTTP 上的 MJPEG 无损流
- python - 元素的大小和百分比