java - 如何从按特定字段过滤的 Kafka 表中读取?
问题描述
我有一个 kafka 表,我们将其命名为“MY_TABLE”,并且有一个结构......像这样:
{
"ROWTIME":123456,
"ROWKEY":"3_1234_all",
"id":1,
"provider_id":3,
"person_id":"1234"
}
在这个 kafka 表中,我有很多不同的数据,具有不同的 provider_id。我需要从这个 kafka 表中检索所有不同的 person_id,其中 provider=3。
我是 kafka 的新手,在这里找到了这种方法: https ://kafka-tutorials.confluent.io/filter-a-stream-of-events/kstreams.html#consume-filtered-events-from-the-output-话题
但我不确定我是否真的需要一个新主题来获得我将在应用程序中使用的过滤数据。我需要每隔几个小时阅读一次此结果,以便按 person_ids 创建查询过滤。
顺便说一句,这是一个 springboot 应用程序,所以我将在 java 上阅读它。
解决方案
不确定“Kafka Table”是什么意思。Kafka 只知道主题,KSQL 和 Kafka Streams 知道表。
假设您的意思是“Kafka 主题”,那么您提供的链接已经是一个很好的起点。
但是,如果您只是想访问应用程序中的数据,则不需要为输出设置不同的主题。
您可以在原始主题上定义一个 KTable,然后使用交互式查询来访问支持该 KTable 的状态存储:
这是关于如何定义 KTable 的草图:
KTable<String, Person> persons = Streamsbuilder.stream("my_topic").filter(p -> p.provider_id == 3).toTable()
您必须在上面的查询中配置正确的序列化器和反序列化器。所以你需要实现一个 PersonDeserializer 和一个 PersonSerializerClass 来将数据写入状态存储。
有关如何使用交互式查询访问状态存储的更多信息,请参见此处:
关于您每隔几个小时重新读取数据的计划,这对我来说似乎是一种反模式。Kafka 主题是从头到尾读取的,只有在极少数情况下,您应该使用同一个使用者多次读取数据。而不是重新读取数据,您应该在数据上构建一个物化视图(如上面的 KTable 所示),然后查询该物化视图。
希望这可以帮助。
推荐阅读
- c++ - 如何在不同的行上返回大写和小写字符串?
- react-leaflet - React-leaflet 集群在反应钩子中不起作用
- spring - 使用 Junit 和 Mockito 嵌套异常问题测试 POST Api
- python - Scrapy & Selenium:如何在循环中调用方法
- design-patterns - 如何使用可更新缓存实现存储库模式
- angular - 带有辅助路线的 Angular 6 材质选项卡导航
- mongodb - 如何在开始测试之前删除 mongodb 中的所有模型并注册用户?
- java - 什么是@constructorproperties?这是属于 J2SE 还是 Spring?
- java - HTTP 500 内部服务器错误:在容器化我的 Java Web 应用程序时
- templates - 更改 svelte 模板的默认位置