首页 > 解决方案 > 使用 Spark 根据以前的列值设置列值,而不重复分组属性

问题描述

给定 DataFrame :

+------------+---------+
|variableName|dataValue|
+------------+---------+
|       IDKey|       I1|
|           b|        y|
|           a|        x|
|       IDKey|       I2|
|           a|        z|
|           b|        w|
|           c|        q|
+------------+---------+

我想创建一个具有相应 IDKey 值的新列,每当 IDKey 的 dataValue 更改时,每个值都会更改,这是预期的输出:

+------------+---------+----------+
|variableName|dataValue|idkeyValue|
+------------+---------+----------+
|       IDKey|       I1|        I1|
|           b|        y|        I1|
|           a|        x|        I1|
|       IDKey|       I2|        I2|
|           a|        z|        I2|
|           b|        w|        I2|
|           c|        q|        I2|
+------------+---------+----------+

我尝试通过执行以下代码来使用mapPartitions()和全局变量

var currentVarValue = ""
frame
  .mapPartitions{ partition =>
    partition.map { row =>
      val (varName, dataValue) = (row.getString(0), row.getString(1))

      val idKeyValue = if (currentVarValue != dataValue && varName == "IDKey") {
        currentVarValue = dataValue
        dataValue
      } else {
        currentVarValue
      }

      ExtendedData(varName, dataValue, currentVarValue)
    }
  }

但这不起作用,因为有两个基本的事情:Spark 不处理全局变量,而且这不符合函数式编程风格

我将很高兴在此感谢您的任何帮助!

标签: scalaapache-spark

解决方案


您无法以 Spark 方式优雅且高效地解决此问题,因为没有为 Spark 提供足够的初始信息来处理保证在同一分区中的所有数据。如果我们在同一个分区中进行所有处理,那么这不是 Spark 的真正意图。

事实上,不能发出合理的 partitionBy(通过 Window 函数)。这里的问题是数据代表一长串连续的此类数据,如果前一个分区中的数据与当前分区相关,则需要跨分区查找。这是可以做到的,但这是一项相当艰巨的工作。zero323 在这里的某个地方有一个试图解决这个问题的答案,但如果我没记错的话,它很麻烦。

这样做的逻辑很简单,但是使用 Spark 是有问题的。

如果没有 partitionBy 数据,所有数据都会被洗牌到单个分区,并可能导致 OOM 和空间问题。

对不起。


推荐阅读