java - 读取日志压缩主题的所有记录
问题描述
我正在尝试使用 java 使用者每 5 秒读取一次日志压缩 kafka 主题的所有记录。我的问题是没有显示最新的(有时不仅仅是一个)记录。当我订阅一个未压缩日志的主题时,一切正常。
conduktor 能够读取所有消息,所以我猜我的消费者配置有问题。
我使用的主题配置:
config.put("cleanup.policy","compact");
config.put("delete.retention.ms","10000");
config.put("min.cleanable.dirty.ratio","0.05");
config.put("segment.ms","10000");
config.put("max.compaction.lag.ms","10000");
消费者配置即时使用:
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "example-consumers");
pops.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
我的源代码的一部分:
while(true){
consumer.seekToBeginning(consumer.assignment());
records = consumer.poll(Duration.ofMillis(5000));
//print all records and wait 5 seconds
}
解决方案
推荐阅读
- javascript - 如何在 vue.js 中包含 3rd 方 JavaScript 文件?
- git - 带有多模块项目的 Maven 发布插件不会在 GIT 上推送提交
- javascript - Bootstrap 和 JS - 如何在有限的情况下适应高数字空间
- bash - 如何在bash脚本中将一个文件夹中的所有tif文件转换为pdf并保存在另一个位置而不循环?
- javascript - 如何将所有子元素包装在一个类中?
- android - 文本大小增量在 Android 中不起作用
- node.js - 运行 npm install @angular/pwa 时出现错误 Rolling back node-pre-gyp@0.14.0 failed
- mongodb - 猫鼬当我使用更新时,它不会更新状态为 200(成功)的任何内容
- php - GAE - 实例似乎死亡并返回 502,而不是优雅地部署
- deployment - 在 Azure DevOps 部署后批准邮件中获取更多详细信息