首页 > 解决方案 > 如果通过 stream.store() 或在处理器中通过 context.getStateStore 检索 Kafka Streams 存储中的近似条目数量的差异

问题描述

我们正在访问由流 DSL 中的 aggregate() 调用使用/填充的状态存储,并且还在我们的 Kafka 流应用程序的其他两个区域中访问。

一个仅用于计划监视条目,由以下人员创建和访问:

val value: StoreQueryParameters[ReadOnlyKeyValueStore[String, Aggregated]] =
  StoreQueryParameters.fromNameAndType(config.stream.stateStoreName, QueryableStoreTypes.keyValueStore())

val store = streams.store(value)

// then later in a scheduled TimerThread
logger.info(store.approximateNumEntries())

Kafka 流应用程序中的另一个位置是在 DSL 的 transfrom() 调用中使用的处理器中:

[...].transform(
    ourTransformerSupplier(config.stream.stateStoreName),
    Named.as("ourTransformer"),
    config.stream.stateStoreName
  )

// Transformer code
def init(context: ProcessorContext): Unit = {
  this.context = context
  this.stateStore = context.getStateStore(stateStoreName).asInstanceOf[TimestampedKeyValueStore[String, Aggregated]]
[...]
// inside the transformers scheduled (WALL_CLOCK_TIME) punctuate:
logger.info(stateStore.approximateNumEntries())

这些数字相差 10 倍到 20 倍,而在同一实例上执行的时间几乎相同。当然,我们使用相同的状态存储名称配置(即它实际上是同一个存储),并且在处理器中采取的操作也会影响监控代码的数量。集群中每个实例的差异幅度大致相同。

我想知道这是否与 API 如何在处理器中返回状态存储有关(TimestampedKeyValueStore 而不是 ReadOnlyKeyValueStore),但是在查看库代码之后,这似乎只是一个 API 包装器。

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读