java - 如何在kafka中加入2个流?
问题描述
学习 Kafka Streams,尝试在 5 分钟的窗口中加入两个流(Json 值)。我的理解是对值具有相同的键来匹配连接条件。如果我的理解是正确的,就像钥匙只能加入,对吧?如果是这样,我如何加入 json 值。IEStream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}.
为了实现这一点,我的 ValueJoiner 应该是什么样子。帮我解决这个问题。我的示例代码:
KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
@Override
public JsonNode apply(JsonNode value1, JsonNode value2) {
if (value1 != null && value2 != null) {
return value1;
}
return null;
}
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
解决方案
您对连接如何工作的理解是正确的(假设记录时间戳不同小于连接窗口大小)。
要操作 JsonNodes,只需搜索互联网:How to modify JsonNode in Java?
推荐阅读
- c# - 要解决此问题,请在至少一个关系上显式配置外键属性
- asp.net - 显示使用 ViewBag 传递的多个表中的值
- git - Git Log - 自上周以来合并
- amazon-web-services - 查询全局二级索引:'MissingRequiredParameter: Missing required key
- javascript - 如何将 onclick 添加到 Selz 按钮?
- docker - 已加密且主机无法访问的卷?
- c++ - anaconda 重新安装后 OpenCV C++ 二进制文件中的链接器错误
- php - php null 合并运算符与串联运算符结合使用无法正常工作
- c# - Specflow 3 是否从链接的功能文件生成测试?
- amazon-web-services - 使用 AWS SDK 通过 Direct Connect、VPC 和 VPC 终端节点从本地世界访问 AWS S3