首页 > 解决方案 > 如何创建异步 Camel-kafka 消费者?

问题描述

我有一条从卡夫卡到消费者的骆驼路线。如果传入的消息是 18000 TPS,它正在以 2000 的 TPS 消费和生产。所以消费者话题有消费者滞后。如果我保持 max.poll.recors = 500 我能够达到 2000 TPS。如果我保持生产者设置 requestRequiredAcks=0 我可以达到 4000 TPS。但仍然存在消费滞后。我们知道,当 from->to 完成时,骆驼路线就完成了。从 2 个消费者计数为 2 的分区消费的消费者一直很忙,直到路由完成。

有没有办法让骆驼卡夫卡消费者异步。任何代码示例?


        from("kafka:{{consumer.topic}}?brokers={{kafka_dev.host}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount=2"
                    + "&seekTo=latest" + "&groupId={{consumer.group}}" + "&keyDeserializer="
                    + KEYDESERIALIZER + "&valueDeserializer=" + VALUEDESERIALIZER + SSL).doTry()
                            .routeId("route1")
                            .process(new CamelProcessor())
                            .to("kafka:{{producer.topic}}?brokers={{kafka_dev.host}}" +"&requestRequiredAcks=1" )

                            .doCatch(Exception.class));

我们还观察到,在此路由中引入线程会重新读取相同的已处理和发送的消息。这个链接是不是说骆驼卡夫卡https://stackoverflow.com/questions/56716812/how-to-commit-offsets-thread-safe-using-camel-kafka

标签: asynchronousapache-kafkaapache-camelkafka-consumer-apisendasynchronousrequest

解决方案


推荐阅读