首页 > 解决方案 > Kafka Stream 和 KGlobalTable Join 问题

问题描述

我在使用 GlobalKTable 加入 KStream 时遇到问题,希望您能提供帮助。

给定两个 Kafka 主题orderscustomers

订单

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

顾客

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

要求是用客户名称丰富订单流

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

我正在尝试以下操作:

  1. orders从主题构建 KStream
  2. customers从主题构建 GlobalKTable
  3. 构建另一个连接订单和客户的流(在客户表中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

但它没有给出任何结果,也没有抛出任何异常。

使用leftJoin时,我得到了客户的 NullPointer 异常。

如果您遇到类似问题,请告诉我并建议如何解决此问题。

标签: javaapache-kafkaapache-kafka-streams

解决方案


让我们仔细看看你复制粘贴的内容:

customers主题中:

"100"   {"CustID":"100","CustName":"Customer1"}

您可以注意到键是一个字符串,并且这个字符串包含双引号: "100"。通常,字符串键打印时不带双引号。我宁愿期待看到:

 100    {"CustID":"100","CustName":"Customer1"}

换句话说,您的键的 Java 字符串表示是""100""(或"\"100\"") 而不是"100"我们所期望的。

另一方面,orders主题中的值是 Json {"ID":"1","Name":"Myorder1","CustID":"100"},属性CustID是 String ,这次用 Java 表示"100"

当您加入ordersandcustomers时,您尝试将订单 CustID100与 Customer 键相匹配"100"。由于 CustID 中缺少密钥中的双引号,这将失败。


推荐阅读