首页 > 解决方案 > Kafka Stream BoundedMemoryRocksDBConfig

问题描述

我试图了解 Kafka Streams 的内部如何在缓存和 RocksDB(状态存储)方面发挥作用。

        KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
                .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
                .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(timeToWaitForMoreEvents),
                        Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

使用我的拓扑的上述部分,我正在使用具有 300 个分区的 Kafka 主题。该应用程序部署在 OpenShift 上,内存分配为 4GB。我注意到应用程序的内存不断增加,直到最终发生 OOMKILLED。经过一些研究,我了解到我应该实现自定义 RocksDB 配置,因为默认大小对于我的应用程序来说太大了。记录首先进入缓存(由 CACHE_MAX_BYTES_BUFFERING_CONFIG 和 COMMIT_INTERVAL_MS_CONFIG 配置),然后进入状态存储。

public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

  private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024 * 1024L, -1, false, 0);
  private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(1 * 1024 * 1024L, cache);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

    // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
    tableConfig.setBlockCache(cache);
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setWriteBufferManager(writeBufferManager);

    // These options are recommended to be set when bounding the total memory
    tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
    tableConfig.setPinTopLevelIndexAndFilter(true);
    tableConfig.setBlockSize(2048L);
    options.setMaxWriteBufferNumber(2);
    options.setWriteBufferSize(1 * 1024 * 1024L);

    options.setTableFormatConfig(tableConfig);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
  }
}

对于每个时间窗口段,默认情况下会创建三个段。如果我从 300 个分区中消费,由于将为每个分区创建 3 个时间窗口段,因此会创建 900 个 RocksDB 实例。我的理解是否正确,以下内容是正确的?

 Memory allocated in OpenShift / RocksDB instances => 4096MB / 900 => 4.55 MB
 
 (WriteBufferSize * MaxWriteBufferNumber) + BlockCache + WriteBufferManager => (1MB * 2) + 1MB + 1MB => 4MB

BoundedMemoryRocksDBConfig.java 是针对 RocksDB 的每个实例,还是全部?

标签: apache-kafka-streams

解决方案


如果您从具有 300 个分区的主题中消费,并且您使用分段状态存储,即,您在 DSL 中使用时间窗口,您最终将获得 900 个 RocksDB 实例。如果只使用一个 Kafka Streams 客户端,即不横向扩展,所有 900 个 RocksDB 实例最终都会在同一个计算节点上。

BoundedMemoryRocksDBConfigRocksDB 每个 Kafka Streams 客户端使用的内存限制。这意味着,如果您只使用一个 Kafka Streams 客户端,则会BoundedMemoryRocksDBConfig限制所有 900 个实例的内存。

我的理解是否正确,以下内容是正确的?

在 OpenShift / RocksDB 实例中分配的内存 => 4096MB / 900 => 4.55 MB

(WriteBufferSize * MaxWriteBufferNumber) + BlockCache + WriteBufferManager => (1MB * 2) + 1MB + 1MB => 4MB

不,这是不正确的。

如果您传递Cacheto WriteBufferManager,memtables 所需的大小也会计入缓存(参见BoundedMemoryRocksDBConfigRocksDB 文档中的脚注 1 )。因此,传递给缓存的大小是内存表和块缓存的限制。由于您将缓存和写入缓冲区管理器传递给同一计算节点上的所有实例,因此所有 900 个实例都受您传递给缓存的大小的限制。例如,如果您指定大小为 4 GB,则所有 900 个实例(假设一个 Kafka Streams 客户端)使用的总内存限制为 4 GB。

请注意,传递给缓存的大小不是严格限制。尽管缓存构造函数中的布尔参数为您提供了强制执行严格限制的选项,但如果由于Kafka Streams 使用的 RocksDB 版本中的错误,写入缓冲区内存也计入缓存,则强制执行不起作用。

使用 Kafka Streams 2.7.0,您将可以使用通过 JMX 公开的指标来监控 RocksDB 内存消耗。查看KIP-607了解更多详情。


推荐阅读