scala - 使用 Spark 根据以前的列值设置列值,而不重复分组属性
问题描述
给定 DataFrame :
+------------+---------+
|variableName|dataValue|
+------------+---------+
| IDKey| I1|
| b| y|
| a| x|
| IDKey| I2|
| a| z|
| b| w|
| c| q|
+------------+---------+
我想创建一个具有相应 IDKey 值的新列,每当 IDKey 的 dataValue 更改时,每个值都会更改,这是预期的输出:
+------------+---------+----------+
|variableName|dataValue|idkeyValue|
+------------+---------+----------+
| IDKey| I1| I1|
| b| y| I1|
| a| x| I1|
| IDKey| I2| I2|
| a| z| I2|
| b| w| I2|
| c| q| I2|
+------------+---------+----------+
我尝试通过执行以下代码来使用mapPartitions()
和全局变量
var currentVarValue = ""
frame
.mapPartitions{ partition =>
partition.map { row =>
val (varName, dataValue) = (row.getString(0), row.getString(1))
val idKeyValue = if (currentVarValue != dataValue && varName == "IDKey") {
currentVarValue = dataValue
dataValue
} else {
currentVarValue
}
ExtendedData(varName, dataValue, currentVarValue)
}
}
但这不起作用,因为有两个基本的事情:Spark 不处理全局变量,而且这不符合函数式编程风格
我将很高兴在此感谢您的任何帮助!
解决方案
您无法以 Spark 方式优雅且高效地解决此问题,因为没有为 Spark 提供足够的初始信息来处理保证在同一分区中的所有数据。如果我们在同一个分区中进行所有处理,那么这不是 Spark 的真正意图。
事实上,不能发出合理的 partitionBy(通过 Window 函数)。这里的问题是数据代表一长串连续的此类数据,如果前一个分区中的数据与当前分区相关,则需要跨分区查找。这是可以做到的,但这是一项相当艰巨的工作。zero323 在这里的某个地方有一个试图解决这个问题的答案,但如果我没记错的话,它很麻烦。
这样做的逻辑很简单,但是使用 Spark 是有问题的。
如果没有 partitionBy 数据,所有数据都会被洗牌到单个分区,并可能导致 OOM 和空间问题。
对不起。
推荐阅读
- python - 如何通过 QstandardItem 设置原始图标?
- dataframe - 为什么我得到这个 dtype Pycaret 回归错误?
- r - Tidymodels:在日期列中估算缺失值?
- python - Python Altair 在选择时生成表格
- python - Pandas 滚动梯度 - 改进/减少计算时间
- reactjs - 反应故事书。预期声明或声明
- amazon-web-services - 将设备连接到 AWS IOT 时出错/失败
- c# - 将 .NET Core 从 2.2 迁移到 3.1
- c# - 我的 web.config 值中有自定义会话状态,因为我已经加密了 sqlConnectionstring,但是在运行我的 Web 应用程序时出现错误
- node.js - 如何在nest.js DTO中同时允许大写和小写