apache-kafka-streams - RocksDB 的 Kafka Streams 内存分配问题
问题描述
我正在尝试制作一个简单的 Kafka Stream 应用程序(v2.3.1 最初是 2.3.0),它收集指定时间间隔的统计信息(例如每分钟即翻滚窗口)。因此,我遵循如下教科书实现
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(1).grace(Duration.ZERO))
aggregate(...),Materialized.as("agg-metric")).withRetention(Duration.ofMinutes(5))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.toStream((key, value) -> key.key())
一切似乎都正常,除了我的内存占用不断增长。我可以看到我的堆内存是稳定的,所以我认为这个问题与创建的 RocksDB 实例有关。
我有 10 个分区,由于每个窗口(每个分区默认 3 个段)创建 3 个段,我预计总共有 30 个 RocksDB 实例。由于 RocksDB 的默认配置值对于我的应用程序来说相当大,我选择更改 RocksDB 的默认配置,并根据下面的代码实现 RocksDBConfigSetter,主要是试图限制堆外内存消耗。
private static final long BLOCK_CACHE_SIZE = 8 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final long WRITE_BUFFER_SIZE = 2 * 1024 * 1024L;
private static final int MAX_WRITE_BUFFERS = 2;
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(BLOCK_CACHE_SIZE); // 8MB
private org.rocksdb.Filter filter = new org.rocksdb.BloomFilter();
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
tableConfig.setBlockCache(cache) // 8 MB
tableConfig.setBlockSize(BLOCK_SIZE); // 4 KB
tableConfig.setCacheIndexAndFilterBlocks(true);;
tableConfig.setPinTopLevelIndexAndFilter(true);
tableConfig.setFilter(new org.rocksdb.BloomFilter())
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); // 2 memtables
options.setWriteBufferSize(WRITE_BUFFER_SIZE); // 2 MB
options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
options.setTableFormatConfig(tableConfig);
}
根据上面的配置值和https://docs.confluent.io/current/streams/sizing.html我希望分配给 RocksDB 的总内存是
(write_buffer_size_mb * write_buffer_count) + block_cache_size_mb => 2*2 + 8 => 12MB
因此,对于 30 个实例,总分配的堆外内存将占 12 * 30 = 360 MB。
当我尝试在具有 2G 内存的 VM 上运行此应用程序时,我为 kafka 流应用程序的堆分配了 512MB,因此根据我的逻辑/理解,分配的总内存应稳定在低于 1 GB (512 + 360) 的值.
不幸的是,情况似乎并非如此,因为我的内存并没有停止增长,尽管在某个点之后缓慢增长,但稳定地接近每天约 2%,并且不可避免地会在某个时候消耗所有 VM 内存并最终终止该进程。更令人担忧的事实是,即使我的流量变得非常低,我也从未见过任何堆外内存的释放。
结果,我想知道在这样一个简单而常见的用例中我做错了什么。在计算我的应用程序的内存消耗时我是否遗漏了什么?是否可以限制我的 VM 上的内存消耗以及我需要更改我的配置的哪些设置以限制 Kafka 流应用程序和 RocksDB 实例的内存分配?
解决方案
推荐阅读
- ios - SwiftUI:如何在另一个视图中更新传递的数组项
- core-data - 使用 SceneDelegate 设置 CoreData - 未知标识符“窗口”错误 - iOS 13 及更高版本
- python - Fabric2 任务和依赖项
- excel - 我正在尝试使此代码正常工作,但由于 End With 而失败
- json - 使用 JSON 输入过滤器的 AWS Sagemaker 批量转换
- pandas - 将一个 CSV 作为数据框附加到另一个上
- javascript - 商店没有在 redux 中更新?
- azure-data-factory - 如何在我的 ADF 管道中查找哪个活动调用了另一个活动
- c# - 在 Windows 服务中使用 Gmail API
- javascript - 如何获得显示的单元格的精确副本?