scala - keyBy 和 sum 两次是什么问题
问题描述
贝娄是我写的简单代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = new ListBuffer[Tuple3[String,Int,Int]]
val random = new Random()
for(x <- 0 to 4){
if(random.nextBoolean()){
list.append(("INSERT",2,1))
} else {
list.append(("UPDATE",2,1))
}
}
val data = env.fromElements(list).flatMap(_.toList)
val keyed = data.keyBy(0).sum(1)
keyed.print()
val reKeyed = keyed.keyBy(0).sum(2)
reKeyed.print()
env.execute()
dataStream reKeyed应将keyed视为输入数据源。但是,打印的结果显示它们来自原始数据源。如果第二次只调用KeyBy而不调用sum方法,打印的结果是正确的。所以有什么问题?
解决方案
我找不到给定代码段的任何问题,并怀疑您的期望与 API 不匹配。
我在源代码和第一个和第二个分组求和中添加了一些打印语句。
source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
source:1> (UPDATE,2,1)
source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:3> (UPDATE,4,1)
first:2> (INSERT,4,1)
first:3> (UPDATE,6,1)
second:2> (INSERT,2,1)
second:3> (UPDATE,2,1)
second:2> (INSERT,2,2)
second:3> (UPDATE,2,2)
second:3> (UPDATE,2,3)
如您所见,随机输入由 3 个更新和 2 个插入语句组成。所以第一个的结果keyBy
正确显示update,6,1
和insert,4,1
。
现在该结果用作 second 的输入keyBy
,但由于您在第二列上求和,所以您的第一个操作的结果将被丢弃。您可能会期望将第一个的“最终”总和keyBy
作为对第二列求和的基本记录。但它实际上总是以第一条记录为基础,这是流媒体设置中唯一合理的选择。
您真正想要的是同一组中两个字段的总和。不幸的是,流 API 没有捷径,但你自己很容易实现。
val keyed = data.keyBy(0)
.reduce((tuple1, tuple2) => (tuple1._1, tuple1._2 + tuple2._2, tuple1._3 + tuple2._3))
keyed.print("first")
那产生
source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (UPDATE,2,1)
source:4> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:2> (INSERT,4,2)
first:2> (INSERT,6,3)
first:2> (INSERT,8,4)
此解决方案也更有效,因为分组数据非常昂贵。
推荐阅读
- python - 如何在此数据框中重复地将平均值链接到另一列的中间值?
- git - 无需提交的版本控制的好处
- javascript - 嵌套数组的循环脚本 - JS
- django - 如何在不在代理后面的 Django 应用程序上安全地重定向到 HTTPS?
- r - R 和 Matlab 中的拟合分布给出了非常不同的结果
- powershell - 在 Powershell 中遇到条件 if 语句的问题
- angular - Cloud Firestore - 如何使用 RXJS 对数据进行分页
- python - Django - 创建和搜索自定义用户模型
- c++ - 声明但不定义未使用的函数是否合法?
- php - 此代码是否有错误,因为它正确输出成功消息但数据库中没有任何更新