首页 > 解决方案 > 在 spark scala 中减少为单列

问题描述

我有下面的 DF。这里的列是基于插槽动态创建的。(1、2、3、4...等)

scala> df1.show
+---+-------+-------+-------+-------+-----+-----+-----+-----+
| ID|1_count|2_count|3_count|4_count|1_per|2_per|3_per|4_per|
+---+-------+-------+-------+-------+-----+-----+-----+-----+
|  1|      3|     11|     15|      3|   70|   80|  150|   20|
|  2|     19|      5|     15|      3|  150|   80|  200|   43|
|  3|     30|     15|     15|     39|   55|   80|  150|  200|
|  4|      8|     65|      3|      3|   70|   80|  150|   55|
+---+-------+-------+-------+-------+-----+-----+-----+-----+

由...制作 -

val df1=Seq(
(1,3,11,15,3,70,80,150,20),
(2,19,5,15,3,150,80,200,43),
(3,30,15,15,39,55,80,150,200),
(4,8,65,3,3,70,80,150,55)
)toDF("ID","1_count","2_count","3_count","4_count","1_per","2_per","3_per","4_per")

我需要选择第一次出现的 per 和 count where per <100 where count >10。这应该是行级操作,即针对每个 ID。

预期产出

+---+-------+----+----+
| ID|  count|per |slot|
+---+-------+----+----+
|  1|     11|  80|  2 |
|  2|      0|  0 |  0 |
|  3|     30| 55 |  1 |
|  4|     65|  80|  2 |
+---+-------+----+----+

输出 ID 的逻辑是找到满足条件的第一列值(x_count ,x_per)(其中 per <100 和 count >10 )

标签: scalaapache-spark

解决方案


  1. 加载您的数据框
  2. 将列映射到数组列 count 到 countArr , perArr
  3. 添加行映射器以遍历列并查找第一个匹配条目
  4. 将行映射到新匹配的列或默认值 (rowid,0,0,0)
import org.apache.spark.sql.functions._

import scala.collection.mutable

object PerCount {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List((1, 3, 11, 15, 3, 70, 80, 150, 20),
      (2, 19, 5, 15, 3, 150, 80, 200, 43),
      (3, 30, 15, 15, 39, 55, 80, 150, 200),
      (4, 8, 65, 3, 3, 70, 80, 150, 55)
    ).toDF("ID", "1_count", "2_count", "3_count", "4_count", "1_per", "2_per", "3_per", "4_per")

    val countArrayColumns = List("1_count", "2_count", "3_count", "4_count")
    val perArrayColumns = List("1_per", "2_per", "3_per", "4_per")

    df.withColumn("countArr", array(countArrayColumns.map(col): _*))
      .withColumn("perArr", array(perArrayColumns.map(col): _*))
      .map(row => {
        val countArr = row.getAs[mutable.WrappedArray[Int]]("countArr")
        val perArr = row.getAs[mutable.WrappedArray[Int]]("perArr")

        val (position, count, per) = countArr.zipWithIndex
          .filter(row => row._1 > 10 && perArr(row._2) < 100)
          .map(row => (row._2 + 1, row._1, perArr(row._2)))
          .headOption.getOrElse((0, 0, 0))

        (row.getInt(0), count, per, position)
      }).toDF("ID", "count", "per", "slot")
      .show()

  }

}



推荐阅读