首页 > 解决方案 > RocksDB 的 Kafka Streams 内存分配问题

问题描述

我正在尝试制作一个简单的 Kafka Stream 应用程序(v2.3.1 最初是 2.3.0),它收集指定时间间隔的统计信息(例如每分钟即翻滚窗口)。因此,我遵循如下教科书实现

events
  .groupByKey()
  .windowedBy(
    TimeWindows.of(Duration.ofMinutes(1).grace(Duration.ZERO))
    aggregate(...),Materialized.as("agg-metric")).withRetention(Duration.ofMinutes(5))

  .suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
  .toStream((key, value) -> key.key())

一切似乎都正常,除了我的内存占用不断增长。我可以看到我的堆内存是稳定的,所以我认为这个问题与创建的 RocksDB 实例有关。

我有 10 个分区,由于每个窗口(每个分区默认 3 个段)创建 3 个段,我预计总共有 30 个 RocksDB 实例。由于 RocksDB 的默认配置值对于我的应用程序来说相当大,我选择更改 RocksDB 的默认配置,并根据下面的代码实现 RocksDBConfigSetter,主要是试图限制堆外内存消耗。

private static final long BLOCK_CACHE_SIZE = 8 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final long WRITE_BUFFER_SIZE = 2 * 1024 * 1024L;
private static final int MAX_WRITE_BUFFERS = 2;

private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(BLOCK_CACHE_SIZE); // 8MB
private org.rocksdb.Filter filter = new org.rocksdb.BloomFilter();

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
     BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
    tableConfig.setBlockCache(cache) // 8 MB
    tableConfig.setBlockSize(BLOCK_SIZE); // 4 KB
    tableConfig.setCacheIndexAndFilterBlocks(true);;
    tableConfig.setPinTopLevelIndexAndFilter(true);

    tableConfig.setFilter(new org.rocksdb.BloomFilter())
    options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); // 2 memtables
    options.setWriteBufferSize(WRITE_BUFFER_SIZE);  // 2 MB
    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);

    options.setTableFormatConfig(tableConfig);
}

根据上面的配置值和https://docs.confluent.io/current/streams/sizing.html我希望分配给 RocksDB 的总内存是

 (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb => 2*2 + 8 => 12MB

因此,对于 30 个实例,总分配的堆外内存将占 12 * 30 = 360 MB

当我尝试在具有 2G 内存的 VM 上运行此应用程序时,我为 kafka 流应用程序的堆分配了 512MB,因此根据我的逻辑/理解,分配的总内存应稳定在低于 1 GB (512 + 360) 的值.

不幸的是,情况似乎并非如此,因为我的内存并没有停止增长,尽管在某个点之后缓慢增长,但稳定地接近每天约 2%,并且不可避免地会在某个时候消耗所有 VM 内存并最终终止该进程。更令人担忧的事实是,即使我的流量变得非常低,我也从未见过任何堆外内存的释放。

结果,我想知道在这样一个简单而常见的用例中我做错了什么。在计算我的应用程序的内存消耗时我是否遗漏了什么?是否可以限制我的 VM 上的内存消耗以及我需要更改我的配置的哪些设置以限制 Kafka 流应用程序和 RocksDB 实例的内存分配?

标签: apache-kafka-streamsrocksdb

解决方案


推荐阅读