java - Kafka Consumer 需要较长的轮询持续时间
问题描述
使用具有以下配置的 Kafka/Java:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
我有一个简单的轮询循环,例如:
consumer.poll(Duration.ofMillis(200));
我注意到一些奇怪的行为。持续时间为 0,它不返回任何结果。在本地,持续时间为 200 毫秒,我得到了一些结果,但在另一个生产环境中它永远不会返回结果,它至少需要 1 秒。
在我的理解中, poll 方法会等到至少找到一个结果。持续时间为 0 时,它至少应该返回已经到达的结果,而不应该总是不返回任何结果。
解释是什么?
解决方案
根据文档:
public ConsumerRecords<K,V> poll(long timeout)
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
因此,基本上作为轮询请求阻塞正在调用它的线程,轮询持续时间是它可以阻塞线程的最长时间。因此,如果超时时间为零或小于发出请求并获得消费者响应所用的时间,则不会返回任何记录。
仅供参考,如果我们将此超时设置为高并将消费者的 max.poll.records 属性设置为我们想要的值,假设max.poll.records : "10"
,那么轮询将在获取 10 条记录后自行结束(即使超时很大)。所以理想情况下需要知道网络延迟,否则我上面提到的技巧可以正常工作。
推荐阅读
- c# - 从列表中提取列表
> 不创建副本? - sap - SAPNCo 应用程序中的非个性化 SAP 对话用户
- node.js - 聚合文档多级
- python - 如何在微分方程(SciPy)中使用 if 语句?
- php - 在 PHP foreach 循环中使用变量
- checkbox - 是否可以将复选框和文本(值)插入到同一个 wxGrid 单元格中?或者我可以将多个复选框插入到单个 wxGrid 单元格中吗?
- java - 将所有字段名称从firestore文档获取到arraylist
- data-structures - 实现索引队列或链表字典的数据结构的名称?
- redirect - 访问没有指向它的域的虚拟主机
- c# - 用于 URL 授权的 Mvc 区域-控制器-操作列表