apache-kafka - Kafka Stream - 按 client_id 过滤
问题描述
我正在使用 Kafka Stream 创建一个仅包含特定于 client_id 的数据的 ktable,这不是主题键。我是 Kafka Streams 的新手,这看起来很简单,但我对社区中可用的多个示例感到有些困惑,这些示例非常好。
我正在尝试获取具有 client_id=0123456 的 inputTopic 数据。在下面的 KSQL 中将类似于命令:
CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;
下面我试图重现相同的行为。有人可以告诉我在下面做错了什么吗?它没有像我预期的那样过滤。
final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));
解决方案
v
是消息的全部值。要在 KSQL 中命名字段,流上有一个关联的模式,例如数据是 JSON 还是 Avro,这意味着 clientid 只是值的一部分
推荐阅读
- mysql - MySQL 操作视图数据
- android - 在 Android 应用程序中集成 PayPal Checkout
- android - 将结果和夹具数据拉入 android 应用程序
- ios - 在 UIScrollView 中绘制 UIBezierPath
- javascript - Vuejs 路由器转换不会发生
- python - keras模型权重无法加载成功
- java - 如何转换/解码字符串?
- angular - Angular 应用程序 - 来自 firestore 数据库的两个可观察对象的 GroupBy
- c - 如何抑制反汇编输出中的函数名称?
- java - Android:为什么聊天列表视图适配器不能正常工作?