java - Kafka 消费者轮询仅第二次获取消息
问题描述
我有配置的消费者:
public static Consumer<String, TransactionDataAvro> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
他订阅了主题
this.consumer.subscribe(Collections.singletonList("fss-fsstransdata"));
每次我尝试获取消息(this.consumer.poll(Duration.ofMillis(500))
)时,它只稳定返回到第二个方法调用
代码
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
日志:
| 17:47:40.688 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:40.689 | main | INFO | IaStepsDefinitions | COUNT:1
| 17:47:41.690 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:42.691 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:43.692 | main | INFO | IaStepsDefinitions | COUNT:0
请向我解释为什么会这样
解决方案
Consumer.poll()
除了实际轮询数据之外,还有很多事情在幕后发生。
- 试图找到小组协调员
- 连接到此组协调器节点
- 开始心跳线
- 加入组并被分区分配
- 如果未找到,则将偏移重置为已提交或最早/最新
- 获取记录
所有这些步骤都受您传递给方法poll()的Duration对象的约束,您可以看到如果 Duration = 1ms,事情会变得更糟。
在我看来,将这个逻辑放到 poll() 方法中,让 poll 进行轮询,在后台线程和/或 subscribe 方法中完成其余的操作是误导和不正确的。
当您进行民意调查时,您不希望系统执行以下操作:
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
轮询是面向客户端的逻辑,如果它获得 0 条记录,则意味着代理为空。
如果您调用REST服务并获得空响应,则您知道服务器为空。如果你调用 PreparedStatement.execute() 你会得到正确的结果或异常。如果你调用 RabbitMQ.basicGet() 你得到一个空响应,这意味着队列是空的。
长话短说,在您的情况下,只需增加第一次投票的超时时间,您就应该立即获得更新。
推荐阅读
- python - Django:没有这样的列 ecomapp_toplist.desc - 任何人都可以找到错误
- c - 函数中的 add() 仅将两个元素添加到列表中
- reactjs - 我们什么时候使用 rerender 变量,什么时候在 React 中使用常规变量?
- php - PHP检查函数是否可调用但不可调用的对象?
- angularjs - 来自 Angular 的 MEAN 堆栈绑定不起作用
- opengl - OpenGL 3.3:如何在传递给顶点和片段着色器之前实现解压缩纹理数据
- javascript - 在我的 Discord Bot 中丢失 Quick.db 的数据
- python - 如何在 Python 中打开多个加密的 PDF 并在没有密码的情况下保存
- javascript - 如何从js中的文本字段中获取整数?
- arrays - 将 Matlab 网格转换为向量