lambda - Kafka-stream:如何通过从值列表中选择键来重新设置流的键
问题描述
我有一个对象 A :
public class A {
String id;
List<String> otherIds;
SomeOtherObject object;
}
我有一个 kafka 流,看起来像:
KStream<Integer, A> inputStream
我需要重新键入这个inputStream流,以便它现在应该是:
KStream<String, A> newStream
newStream 的关键部分是 来自A.otherIds的otherId。
举个例子
Let's say, A is like : { id:1, otherIds:[ "ab","bc","ca"],OtherObject: obj1}.
And inputStream if like <1,A>,
Then the newStream should have:
<"ab",A>
<"bc", A>
<"ca",A>
粗略地说,要了解我正在尝试的是:
KStream<String, A> newStream =
inputStream
.map((key,val) ->
val.getOtherIds().stream().forEach(e->
KeyValue.pair(e,val))
);
有没有办法做到这一点(通过从值列表中选择键来重新键入)?
解决方案
由于要将单个记录拆分为多个,因此应使用flatMap()
而不是map()
. map()
是 1:1 操作,而flatMap()
是 1:n。
要返回多KeyValue
对,您flatMap()
可以return
使用List<KevValue>
例如(任何其他Collection<KeyValue>
类型也可以)。
推荐阅读
- r - 我将如何使用 FFT 分析 R、Rstudio 中的音频波
- r - 提取整个流域多边形的网格化 (netcdf) 气候数据
- r - html_nodes 返回复杂表的空列表
- c++ - 强制 OpenSSL 以流模式加密/解密(如 AES-CFG、AES-OFB 等)填充/取消填充数据
- amazon-web-services - Windows Server 2012 的 EC2 实例状态检查失败
- rename - 在 AIX 中重命名包含垃圾字符的目录名称
- node.js - Discord.js“类型错误:无法读取未定义的属性‘发送’”
- anylogic - Anylogic构造函数PalletRack不工作
- visual-studio-2019 - 如何在运行 .net core web 应用程序时解决这个问题
- c++ - 将延迟编写的代码转换为millis()