java - Kafka Stream 和 KGlobalTable Join 问题
问题描述
我在使用 GlobalKTable 加入 KStream 时遇到问题,希望您能提供帮助。
给定两个 Kafka 主题orders
和customers
:
订单
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
顾客
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
要求是用客户名称丰富订单流
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
我正在尝试以下操作:
orders
从主题构建 KStreamcustomers
从主题构建 GlobalKTable- 构建另一个连接订单和客户的流(在客户表中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
但它没有给出任何结果,也没有抛出任何异常。
使用leftJoin
时,我得到了客户的 NullPointer 异常。
如果您遇到类似问题,请告诉我并建议如何解决此问题。
解决方案
让我们仔细看看你复制粘贴的内容:
在customers
主题中:
"100" {"CustID":"100","CustName":"Customer1"}
您可以注意到键是一个字符串,并且这个字符串包含双引号: "100"
。通常,字符串键打印时不带双引号。我宁愿期待看到:
100 {"CustID":"100","CustName":"Customer1"}
换句话说,您的键的 Java 字符串表示是""100""
(或"\"100\""
) 而不是"100"
我们所期望的。
另一方面,orders
主题中的值是 Json {"ID":"1","Name":"Myorder1","CustID":"100"}
,属性CustID
是 String ,这次用 Java 表示"100"
。
当您加入orders
andcustomers
时,您尝试将订单 CustID100
与 Customer 键相匹配"100"
。由于 CustID 中缺少密钥中的双引号,这将失败。
推荐阅读
- c# - 如何在 automapper 中为 aftermap 上的目标属性赋值
- wordpress - ACF 中继器 - 显示前 10 行并将其他行放在另一个 div
- python - 如何在本地使用 tf-hub 模型
- r - 用值列表替换条形标签
- discord.py - (discord.py 重写)如何断开每个人与不和谐语音频道的连接?
- php - Laravel 7 与 Vue js 执行没有 php artisan serve 命令
- java - 如何使用 JDBC Thin Driver 和 Oracle Wallet 连接到 Oracle Cloud DDBB
- actionscript-3 - 如何让自上而下的射击子弹与敌人相撞?AS3
- python - 使用 DDG 获取响应 200 而不是 <418 I'm a Teapot>
- php - 使用 PHP 将受密码保护的 Excel 文件数据导入 MySQL 数据库