首页 > 解决方案 > 读取日志压缩主题的所有记录

问题描述

我正在尝试使用 java 使用者每 5 秒读取一次日志压缩 kafka 主题的所有记录。我的问题是没有显示最新的(有时不仅仅是一个)记录。当我订阅一个未压缩日志的主题时,一切正常。

conduktor 能够读取所有消息,所以我猜我的消费者配置有问题。

我使用的主题配置:

config.put("cleanup.policy","compact");
config.put("delete.retention.ms","10000");
config.put("min.cleanable.dirty.ratio","0.05");
config.put("segment.ms","10000");
config.put("max.compaction.lag.ms","10000");

消费者配置即时使用:

props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "example-consumers");
pops.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

我的源代码的一部分:

while(true){
   consumer.seekToBeginning(consumer.assignment());
   records = consumer.poll(Duration.ofMillis(5000));

   //print all records and wait 5 seconds
}

标签: javaapache-kafka

解决方案


推荐阅读