首页 > 解决方案 > Kafka Stream - 按 client_id 过滤

问题描述

我正在使用 Kafka Stream 创建一个仅包含特定于 client_id 的数据的 ktable,这不是主题键。我是 Kafka Streams 的新手,这看起来很简单,但我对社区中可用的多个示例感到有些困惑,这些示例非常好。

我正在尝试获取具有 client_id=0123456 的 inputTopic 数据。在下面的 KSQL 中将类似于命令:

CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;

下面我试图重现相同的行为。有人可以告诉我在下面做错了什么吗?它没有像我预期的那样过滤。

        final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
        final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
        stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
        convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));

标签: apache-kafkastreamksqldbktable

解决方案


v是消息的全部值。要在 KSQL 中命名字段,流上有一个关联的模式,例如数据是 JSON 还是 Avro,这意味着 clientid 只是值的一部分


推荐阅读