首页 > 解决方案 > 如何在kafka中加入2个流?

问题描述

学习 Kafka Streams,尝试在 5 分钟的窗口中加入两个流(Json 值)。我的理解是对值具有相同的键来匹配连接条件。如果我的理解是正确的,就像钥匙只能加入,对吧?如果是这样,我如何加入 json 值。IEStream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}.

为了实现这一点,我的 ValueJoiner 应该是什么样子。帮我解决这个问题。我的示例代码:

KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
                new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
                    @Override
                    public JsonNode apply(JsonNode value1, JsonNode value2) {
                        if (value1 != null && value2 != null) {


                            return value1;
                        }
                        return null;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
                        jsonSerde, /* left value */
                        jsonSerde) /* right value */
        );

标签: javaapache-kafkaapache-kafka-streams

解决方案


您对连接如何工作的理解是正确的(假设记录时间戳不同小于连接窗口大小)。

要操作 JsonNodes,只需搜索互联网:How to modify JsonNode in Java?


推荐阅读