首页 > 解决方案 > Kafka Consumer 需要较长的轮询持续时间

问题描述

使用具有以下配置的 Kafka/Java:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

我有一个简单的轮询循环,例如:

consumer.poll(Duration.ofMillis(200));

我注意到一些奇怪的行为。持续时间为 0,它不返回任何结果。在本地,持续时间为 200 毫秒,我得到了一些结果,但在另一个生产环境中它永远不会返回结果,它至少需要 1 秒。

在我的理解中, poll 方法会等到至少找到一个结果。持续时间为 0 时,它至少应该返回已经到达的结果,而不应该总是不返回任何结果。

解释是什么?

标签: javaapache-kafkakafka-consumer-api

解决方案


根据文档:

public ConsumerRecords<K,V> poll​(long timeout)
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.

因此,基本上作为轮询请求阻塞正在调用它的线程,轮询持续时间是它可以阻塞线程的最长时间。因此,如果超时时间为零或小于发出请求并获得消费者响应所用的时间,则不会返回任何记录。

仅供参考,如果我们将此超时设置为高并将消费者的 max.poll.records 属性设置为我们想要的值,假设max.poll.records : "10",那么轮询将在获取 10 条记录后自行结束(即使超时很大)。所以理想情况下需要知道网络延迟,否则我上面提到的技巧可以正常工作。


推荐阅读