spring-cloud-stream - 从消费者创建materializeView(函数式编程模型)
问题描述
是否可以从消费者创建物化视图,而无需真正为其编写逻辑?因为我在 kafka 主题中的数据可以在没有任何转换的情况下实现
1. Using KTable
@Bean
public Consumer<KTable<String, Pojo>> process() {
return ktable -> <what should i do here>??;
}
我在文档中读到,传入的 KTable 可以直接通过 spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.materializedAs 实现:incoming-store https://docs.spring.io/spring- cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_state_store
2. How about when using GlobalKTable ?
@Bean
public Consumer<GlobalTable<String, Pojo>> process() {
return ktable -> <what can I do here>???;
}
如果我想要来自所有分区的 MV 怎么样,上面的代码是一个有效的构造吗?
解决方案
您可以编写一个空的消费者函数,如下所示。
@Bean
public Consumer<KTable<String, Pojo>> process() {
return ktable -> {};
}
然后定义以下配置:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.materializedAs: my-store
然后成为消费数据my-store
的底层 Kafka 主题的物化视图。KTable
您可以对这个物化视图使用交互式查询。
也是如此GlobalKTable
。
例如,
@Bean
public Consumer<GlobalKTable<String, Pojo>> globalProcess() {
return gktable -> {};
}
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.materializedAs: my-global-store
在像 Kotlin 这样的语言中,您可以将上述函数作为单行代码,如下所示:
@Bean
fun storeBuilder() = Consumer<GlobalKTable<String, Pojo>> {}
推荐阅读
- javascript - 更改 JSON 数组中每个元素的颜色
- r - 有条件地将数据框中的值替换为另一个数据框中的值
- c++ - 如何使用 C++ 中的结构制作地图?
- javascript - 更改全局状态更改的输入占位符
- python - 我收到错误消息 ValueError: cannot insert Cluster_Label, already exists。我希望有人能帮我一把
- javascript - 单击按钮时电子可以执行节点吗?
- grpc - 简单的 gRPC 特使配置
- python - INSERT INTO 语句 - Python 正则表达式和 Sqlite
- javascript - 选择具有相同类名的多个跨度元素
- android - 如何在android中找到两个日期选择器之间的天数?