首页 > 解决方案 > Kafka流状态存储rocksdb文件大小在手动删除消息时不会减少

问题描述

我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过 kafka 密钥在状态存储上使用交互式查询调用确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。

@Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long l) {
                processorContext.commit();
            }
        }); //invoke punctuate every 12 seconds
        this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
        log.info("Processor initialized");
    }

    @Override
    public void process(String key, GenericRecord value) {
        statestore.all().forEachRemaining(keyValue -> {
            statestore.delete(keyValue.key);
        });
    }

kafka 流目录大小

2.3M    /private/tmp/kafka-streams
3.3M    /private/tmp/kafka-streams

我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更改日志主题中删除记录。

标签: apache-kafka-streamsrocksdbspring-cloud-stream-binder-kafka

解决方案


RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,您应该RocksDBConfigSetter通过 Streams 配置参数传入自定义rocksdb.config.setter。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

但是,只要没有真正的问题,我不建议更改 RocksDB 配置——弊大于利。似乎您的存储空间很小,因此,我没有看到真正的问题。

顺便说一句:如果你去生产,你应该将state.dir配置更改为适当的目录,即使重新启动机器后状态也不会丢失。如果您将状态放入默认/tmp位置,则在重新启动机器后状态很可能会消失,并且会触发从更改日志主题中进行的昂贵恢复。


推荐阅读