首页 > 解决方案 > 如何通过列上的组合并行化 apache spark 中数据帧的处理

问题描述

我正在寻找一种解决方案来构建具有所有列组合的聚合。例如,我有一个数据框如下:

val df = Seq(("A", 1),  ("B", 2),  ("C", 3),  ("A", 4),  ("B", 5)).toDF("id", "value")

+---+-----+
| id|value|
+---+-----+
|  A|    1|
|  B|    2|
|  C|    3|
|  A|    4|
|  B|    5|
+---+-----+

并在“id”列上查找所有组合的聚合。在下面我找到了一个解决方案,但这不能使用 Spark 的并行性,仅适用于驱动程序节点或仅适用于单个执行程序。有没有更好的解决方案来摆脱 for 循环?

import spark.implicits._;

val list =df.select($"id").distinct().orderBy($"id").as[String].collect();
val combinations = (1 to list.length flatMap (x => list.combinations(x))) filter(_.length >1)

val schema = StructType(
  StructField("indexvalue", IntegerType, true) ::
  StructField("segment", StringType, true) :: Nil)
var initialDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

for (x <- combinations) {
       initialDF = initialDF.union(df.filter($"id".isin(x: _*))
          .agg(expr("sum(value)").as("indexvalue"))
          .withColumn("segment",lit(x.mkString("+"))))

 }

 initialDF.show()

+----------+-------+
|indexvalue|segment|
+----------+-------+
|        12|    A+B|
|         8|    A+C|
|        10|    B+C|
|        15|  A+B+C|
+----------+-------+

标签: scalaapache-spark

解决方案


推荐阅读