apache-kafka - 卡夫卡消费者长时间闲置后不消费
问题描述
嗨,我有 kafka 消费者消费数据。下面的命令给了我消费者组命令超时。
kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup
错误:执行消费者组命令失败,原因是消费者组命令在等待组初始化时超时:
所有消费者已经使用数据超过 26 小时。由于生产者在这 6 小时内不再生成数据,因此存在超过 6 小时的差距。
我怀疑有一些空闲时间可能断开了消费者组与消费者之间的连接。所有消费者都在以 100ms 的轮询间隔进行消费poll(100)
。
这种情况已经被观察了 3 次以上。感谢 Kafka 专家的任何帮助。谢谢。
代码:
@Service
public class DedupeConsumerService {
final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private PropertyConfig config;
@Autowired
private ApplicationContext applicationContext;
public void consume() {
String topic = config.getDedupServiceConsumerTopic();
String consGroup = config.getDedupServiceConsGroup();
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "20000");
props.put("max.poll.records", "10000");
KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);
logger.info("Dedupe Kafka Consumer Initialized......");
try {
while (true) {
ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
if (records.count() > 0) {
}
logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
}
}
} catch (Throwable e) {
logger.error("Error occured while processing message", e);
e.printStackTrace();
} finally {
logger.debug("dedupe kafka consume is closing");
consumer.close();
}
}
}
我试着安排投票时间,因为Integer.MAX_VALUE
这没有帮助。
解决方案
考虑为您的消费者配置心跳设置:https ://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_heartbeat.interval.ms
Heartbeat 将确保连接不会自动关闭(很像保持活动状态)。
推荐阅读
- sql - 如何在大查询中使用 WEEKNUM?
- amazon-web-services - 在 AWS 中存储/读取 1 位数据的最便宜/最简单的方法
- java - HTTP 请求在不同的环境中获得授权和未经授权,并且设置似乎相同
- python - 将真实值的索引过滤到列表中
- java - 在 spark Dataset s 可以作为输入 args 传递给函数以输出函数的 args 吗?
- javascript - 将可配置的 appInsights 密钥传递给 javascript (react/angular) 应用程序
- 3d - 使用低质量 GPU 的 Maya 3d 模型
- shell - 如何删除在unix文件中的两个位置具有两个模式的行
- html - 两个 CSS 网格未在同一个父容器内对齐
- r - R:R中的世界银行数据与导出到csv之间的数据不一致