首页 > 解决方案 > 如何使用自定义值(和自定义 serdes)加入两个 ktable

问题描述

我想加入两个具有自定义值的 ktable。该文档清楚地说明了默认类型(使用默认 serdes) - https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-join

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

但是当我使用自定义值时,我得到一个序列化错误,并且没有用于传递自定义 serdes 的重载。我怎样才能做到这一点?

KTable<String, ModelA> left = ...;
KTable<String, ModelB> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, ModelC> joined = left.leftJoin(right,
    (leftValue, rightValue) -> new ModelC("left=" + leftValue.Name + ", right=" + rightValue.Name /* ValueJoiner */
  );

标签: apache-kafkaapache-kafka-streams

解决方案


我终于明白我做错了什么。错误消息有点误导:

更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes

但是我不想更改默认的 serde,并且 ktables join 没有过载来传递 serdes。

问题实际上在于我使用 stream.toTable 方法创建了 ktable,该方法也没有重载来传递 serdes。我所做的是在之前(使用 serdes)声明 ktable,然后使用 stream.to 方法。

可能是新手的错误,但在这里。


推荐阅读