首页 > 解决方案 > keyBy 和 sum 两次是什么问题

问题描述

贝娄是我写的简单代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val list = new ListBuffer[Tuple3[String,Int,Int]]

val random = new Random()

for(x <- 0 to 4){
  if(random.nextBoolean()){
    list.append(("INSERT",2,1))
  } else {
    list.append(("UPDATE",2,1))
  }
}


val data = env.fromElements(list).flatMap(_.toList)


val keyed = data.keyBy(0).sum(1)

keyed.print()

val reKeyed = keyed.keyBy(0).sum(2)
reKeyed.print()

env.execute()

dataStream reKeyed应将keyed视为输入数据源。但是,打印的结果显示它们来自原始数据源。如果第二次只调用KeyBy而不调用sum方法,打印的结果是正确的。所以有什么问题?

标签: scalaapache-flink

解决方案


我找不到给定代码段的任何问题,并怀疑您的期望与 API 不匹配。

我在源代码和第一个和第二个分组求和中添加了一些打印语句。

source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
source:1> (UPDATE,2,1)
source:1> (UPDATE,2,1)
source:1> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:3> (UPDATE,4,1)
first:2> (INSERT,4,1)
first:3> (UPDATE,6,1)
second:2> (INSERT,2,1)
second:3> (UPDATE,2,1)
second:2> (INSERT,2,2)
second:3> (UPDATE,2,2)
second:3> (UPDATE,2,3)

如您所见,随机输入由 3 个更新和 2 个插入语句组成。所以第一个的结果keyBy正确显示update,6,1insert,4,1

现在该结果用作 second 的输入keyBy,但由于您在第二列上求和,所以您的第一个操作的结果将被丢弃。您可能会期望将第一个的“最终”总和keyBy作为对第二列求和的基本记录。但它实际上总是以第一条记录为基础,这是流媒体设置中唯一合理的选择。

您真正想要的是同一组中两个字段的总和。不幸的是,流 API 没有捷径,但你自己很容易实现。

val keyed = data.keyBy(0)
    .reduce((tuple1, tuple2) => (tuple1._1, tuple1._2 + tuple2._2, tuple1._3 + tuple2._3))

keyed.print("first")

那产生

source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (INSERT,2,1)
source:4> (UPDATE,2,1)
source:4> (INSERT,2,1)
first:3> (UPDATE,2,1)
first:2> (INSERT,2,1)
first:2> (INSERT,4,2)
first:2> (INSERT,6,3)
first:2> (INSERT,8,4)

此解决方案也更有效,因为分组数据非常昂贵。


推荐阅读