apache-kafka - 如果通过 stream.store() 或在处理器中通过 context.getStateStore 检索 Kafka Streams 存储中的近似条目数量的差异
问题描述
我们正在访问由流 DSL 中的 aggregate() 调用使用/填充的状态存储,并且还在我们的 Kafka 流应用程序的其他两个区域中访问。
一个仅用于计划监视条目,由以下人员创建和访问:
val value: StoreQueryParameters[ReadOnlyKeyValueStore[String, Aggregated]] =
StoreQueryParameters.fromNameAndType(config.stream.stateStoreName, QueryableStoreTypes.keyValueStore())
val store = streams.store(value)
// then later in a scheduled TimerThread
logger.info(store.approximateNumEntries())
Kafka 流应用程序中的另一个位置是在 DSL 的 transfrom() 调用中使用的处理器中:
[...].transform(
ourTransformerSupplier(config.stream.stateStoreName),
Named.as("ourTransformer"),
config.stream.stateStoreName
)
// Transformer code
def init(context: ProcessorContext): Unit = {
this.context = context
this.stateStore = context.getStateStore(stateStoreName).asInstanceOf[TimestampedKeyValueStore[String, Aggregated]]
[...]
// inside the transformers scheduled (WALL_CLOCK_TIME) punctuate:
logger.info(stateStore.approximateNumEntries())
这些数字相差 10 倍到 20 倍,而在同一实例上执行的时间几乎相同。当然,我们使用相同的状态存储名称配置(即它实际上是同一个存储),并且在处理器中采取的操作也会影响监控代码的数量。集群中每个实例的差异幅度大致相同。
我想知道这是否与 API 如何在处理器中返回状态存储有关(TimestampedKeyValueStore 而不是 ReadOnlyKeyValueStore),但是在查看库代码之后,这似乎只是一个 API 包装器。
解决方案
推荐阅读
- r - 将下三角矩阵转换为向量,从下到上
- node.js - 项目编译后找不到模块
- javascript - 如何在 vue.js 中创建一个使用全局列表和 foreach 的当前项的计算函数?
- java - 使用 SmbFileInputStream 获取 NullPointerException
- fpga - 关于在 Quartus II 中编程 Altera Cyclone II 的新手问题
- android - 无法使用 ADB Espresso 清除应用数据
- java - 为什么 IntelliJ IDEA 无法确定 java 版本?
- reactjs - React - 在渲染之前等待复杂方法完成
- python - 如何将 JSON 结构化数据写入 Python 中的文本文件?
- image - 使用 FFMPEG 将带有缩放 + 旋转的图像添加到视频叠加层?