scala - 在 spark scala 中减少为单列
问题描述
我有下面的 DF。这里的列是基于插槽动态创建的。(1、2、3、4...等)
scala> df1.show
+---+-------+-------+-------+-------+-----+-----+-----+-----+
| ID|1_count|2_count|3_count|4_count|1_per|2_per|3_per|4_per|
+---+-------+-------+-------+-------+-----+-----+-----+-----+
| 1| 3| 11| 15| 3| 70| 80| 150| 20|
| 2| 19| 5| 15| 3| 150| 80| 200| 43|
| 3| 30| 15| 15| 39| 55| 80| 150| 200|
| 4| 8| 65| 3| 3| 70| 80| 150| 55|
+---+-------+-------+-------+-------+-----+-----+-----+-----+
由...制作 -
val df1=Seq(
(1,3,11,15,3,70,80,150,20),
(2,19,5,15,3,150,80,200,43),
(3,30,15,15,39,55,80,150,200),
(4,8,65,3,3,70,80,150,55)
)toDF("ID","1_count","2_count","3_count","4_count","1_per","2_per","3_per","4_per")
我需要选择第一次出现的 per 和 count where per <100 where count >10。这应该是行级操作,即针对每个 ID。
预期产出
+---+-------+----+----+
| ID| count|per |slot|
+---+-------+----+----+
| 1| 11| 80| 2 |
| 2| 0| 0 | 0 |
| 3| 30| 55 | 1 |
| 4| 65| 80| 2 |
+---+-------+----+----+
输出 ID 的逻辑是找到满足条件的第一列值(x_count ,x_per)(其中 per <100 和 count >10 )
解决方案
- 加载您的数据框
- 将列映射到数组列 count 到 countArr , perArr
- 添加行映射器以遍历列并查找第一个匹配条目
- 将行映射到新匹配的列或默认值 (rowid,0,0,0)
import org.apache.spark.sql.functions._
import scala.collection.mutable
object PerCount {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val df = List((1, 3, 11, 15, 3, 70, 80, 150, 20),
(2, 19, 5, 15, 3, 150, 80, 200, 43),
(3, 30, 15, 15, 39, 55, 80, 150, 200),
(4, 8, 65, 3, 3, 70, 80, 150, 55)
).toDF("ID", "1_count", "2_count", "3_count", "4_count", "1_per", "2_per", "3_per", "4_per")
val countArrayColumns = List("1_count", "2_count", "3_count", "4_count")
val perArrayColumns = List("1_per", "2_per", "3_per", "4_per")
df.withColumn("countArr", array(countArrayColumns.map(col): _*))
.withColumn("perArr", array(perArrayColumns.map(col): _*))
.map(row => {
val countArr = row.getAs[mutable.WrappedArray[Int]]("countArr")
val perArr = row.getAs[mutable.WrappedArray[Int]]("perArr")
val (position, count, per) = countArr.zipWithIndex
.filter(row => row._1 > 10 && perArr(row._2) < 100)
.map(row => (row._2 + 1, row._1, perArr(row._2)))
.headOption.getOrElse((0, 0, 0))
(row.getInt(0), count, per, position)
}).toDF("ID", "count", "per", "slot")
.show()
}
}
推荐阅读
- python - 字母计数功能
- bluetooth - Is there any bluetooth module supports both Bluetooth/BLE 5.0 and 4.x at the same time?
- json - How to decode this JSON with the new decodable protocol?
- android - 1. 如何从另一个活动片段中打开一个活动片段?
- python - 从文件中读取中文文本并打印到shell
- r - R:在合并问题之前删除行的问题
- apache - 通过 nodejs 和 jsmpeg 在 uberspace 之间传输视频数据
- csv - 由于转换的特殊字符,Amazon Redshift 副本上出现“字符串长度超过 DDL 长度”错误
- linux-kernel - 我无法加载 Raspberry Pi 的内核并随后使用 modprobe 以使用 i2c_stub
- html - 编译 sass 文件“错误:预期的空格,是制表符。”