apache-kafka-streams - 在创建它的同一个应用程序中查询 KTable
问题描述
我有一个 Kafka 流应用程序,我在其中读取一个主题,进行聚合并在 KTable 中实现。然后我创建一个 Stream 并在流上运行一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想将 KTable 推送到新主题。
KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
source.groupBy((key, value) -> value.getKey(),
Grouped.<String, MyClass >as("repartition-1")
.withKeySerde(new Serdes.String())
.withValueSerde(new MyClassSerDes()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
.withKeySerde(new Serdes.String())
.withValueSerde(Serdes.Long()));
在这里,我想使用 kTable 中的数据。
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30)))
.toStream()
.filter((k, v) -> {
// Here get the count for the previous Window.
// Use that count for some computation here.
}
解决方案
您可以将KTable
商店添加到处理器/变压器。对于您的情况,您可以替换filter
with flatTransform
(或任何类似的兄弟transform
等,具体取决于您是否需要访问密钥)并将商店连接到运营商:
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30))
)
.toStream()
// requires v2.2; otherwise use `transform()`
// if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
.flatTransform(
() -> new Transformer<Windowed<myKey>,
Long,
List<KeyValue<Windowed<myKey>, Long>>() {
private ReadOnlyWindowStore<myKey, Long> store;
public void init(final ProcessorContext context) {
// get a handle on the store by its name
// as specified via `Materialized` above;
// should be read-only
store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
}
public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
Long value) {
// access `store` as you wish to make a filtering decision
if ( ... ) {
// record passes
return Collection.singletonList(KeyValue.pair(key, value));
} else {
// drop record
return Collection.emptyList();
}
}
public void close() {} // nothing to do
},
"str" // connect the KTable store to the transformer using its name
// as specified via `Materialized` above
);
推荐阅读
- javascript - JavaScript 模拟鼠标点击不起作用(Chrome 扩展)
- java - 必需类型“贷款”,给定字符串
- python-3.x - 如何使用 matplotlib 绘制半圆
- python - 将点移动到最近的未占用网格位置
- python - 如何使用来自另一个表的数据在 Python 中更新 oracle 表
- python - 使用 Python 在 AWS-Lambda 内的 ReportLab 中创建带有图像的 PDF
- java - Spring Data JPA 原生 @Query,整个实体名为 @Param
- javascript - 从输入数据作为图像数组传递到 api
- javascript - 动态改变jquery contextmenu
- firebase - 如何将字段写入 Firestore 类型引用