apache-kafka-streams - Kafka流状态存储rocksdb文件大小在手动删除消息时不会减少
问题描述
我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过 kafka 密钥在状态存储上使用交互式查询调用确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka 流目录大小
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更改日志主题中删除记录。
解决方案
RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,您应该RocksDBConfigSetter
通过 Streams 配置参数传入自定义rocksdb.config.setter
。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter
但是,只要没有真正的问题,我不建议更改 RocksDB 配置——弊大于利。似乎您的存储空间很小,因此,我没有看到真正的问题。
顺便说一句:如果你去生产,你应该将state.dir
配置更改为适当的目录,即使重新启动机器后状态也不会丢失。如果您将状态放入默认/tmp
位置,则在重新启动机器后状态很可能会消失,并且会触发从更改日志主题中进行的昂贵恢复。
推荐阅读
- android - 任务':app:kaptDebugKotlin'的java.lang.reflect.InvocationTargetException(无错误消息)
- r - R中的主成分分析图
- google-apps-script - Google Apps 脚本触发器 - 将新文件添加到文件夹时运行
- python - 两个pip安装的模块同名,如何选择加载哪一个?
- c++ - 我可以使用“距离”作为我的类成员变量名吗?
- php - 按存储为 Varchar 的日期时间过滤 MySQL 查询
- javascript - 当元素超过某个 div 时更改 CSS
- docker - Kafka 连接 docker 映像 - 找不到任何实现连接器且名称与 ElasticsearchSinkConnector 匹配的类
- amazon-web-services - 尝试从私有 Codecommit 存储库中提取时,在 CodeBuild 上安装 NPM 失败
- excel - 如何在 Excel 中自动更改列宽