apache-kafka - 如何使用自定义值(和自定义 serdes)加入两个 ktable
问题描述
我想加入两个具有自定义值的 ktable。该文档清楚地说明了默认类型(使用默认 serdes) - https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-join
KTable<String, Long> left = ...;
KTable<String, Double> right = ...;
// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
);
但是当我使用自定义值时,我得到一个序列化错误,并且没有用于传递自定义 serdes 的重载。我怎样才能做到这一点?
KTable<String, ModelA> left = ...;
KTable<String, ModelB> right = ...;
// Java 8+ example, using lambda expressions
KTable<String, ModelC> joined = left.leftJoin(right,
(leftValue, rightValue) -> new ModelC("left=" + leftValue.Name + ", right=" + rightValue.Name /* ValueJoiner */
);
解决方案
我终于明白我做错了什么。错误消息有点误导:
更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes
但是我不想更改默认的 serde,并且 ktables join 没有过载来传递 serdes。
问题实际上在于我使用 stream.toTable 方法创建了 ktable,该方法也没有重载来传递 serdes。我所做的是在之前(使用 serdes)声明 ktable,然后使用 stream.to 方法。
可能是新手的错误,但在这里。
推荐阅读
- python - Python 代码在 CS50x 的 DNA 问题中运行时间过长
- javascript - 为什么 getElementById 等 DOM 方法找不到元素?
- user-interface - 如何在选项卡控制区域内添加控件?
- arrays - 无法从 ruby 获得价值#
- google-cloud-platform - 无法在 GCP 上打开 vm 实例的 ssh:无法连接到后端(代码:4003)
- python - 尝试使用 spyder 4 绘图时出现“Matplotlib 当前正在使用 agg”错误
- javascript - 如何将 fileInput 元素的结果作为任何文件类型的文件下载?
- angular - 从 UI 或后端形成聚合动态 mongo 查询的最佳方法?
- .net - Nuget 包不会将内容复制到输出目录
- javascript - 将光标移动到反应组件时,React DevTools 显示非常大的背景