首页 > 解决方案 > 使用 Spark DataFrame 转置表

问题描述

以下是我的输入DataFrame

+------+-------+---+------+-----+-----+-----+-----+-----+-----+
|number|word   |ID |Name  |prd_1|prd_2|prd_3|prd_4|prd_5|prd_6|
+------+-------+---+------+-----+-----+-----+-----+-----+-----+
|1     |bat    |101|Naman |2    |3    |8    |4    |5    |10   |
|2     |abc    |102|Bhagat|3    |8    |7    |9    |8    |11   |
|3     |abcd   |103|Anchal|1    |9    |2    |3    |6    |12   |
|4     |abcde  |104|Dev   |8    |6    |9    |4    |5    |13   |
|3     |abcdef |105|PArul |2    |7    |8    |7    |3    |14   |
|1     |abcdefg|106|Vipul |3    |4    |2    |8    |4    |15   |
+------+-------+---+------+-----+-----+-----+-----+-----+-----+

现在我想将prd_1, prd_2,prd_3转换成单列col1, prd_4, prd_5,prd_6col2. 以下是我正在寻找
的预期输出:DataFrame

number|word   |ID |Name  |col1|col2|
+------+-------+---+------+----+----+
|1     |bat    |101|Naman |2   |4   |
|1     |bat    |101|Naman |3   |5   |
|1     |bat    |101|Naman |8   |10  |
|2     |abc    |102|Bhagat|3   |9   |
|2     |abc    |102|Bhagat|8   |8   |
|2     |abc    |102|Bhagat|7   |11  |
|3     |abcd   |103|Anchal|1   |3   |
|3     |abcd   |103|Anchal|9   |6   |
|3     |abcd   |103|Anchal|2   |12  | 

这是我尝试过的:

val df2 = Seq(
        (1, "bat", 101, "Naman", 2, 3, 8, 4, 5,10),
        (2, "abc", 102, "Bhagat", 3, 8, 7, 9, 8,11),
        (3, "abcd", 103, "Anchal", 1, 9, 2, 3, 6,12),
        (4, "abcde", 104, "Dev", 8, 6, 9, 4, 5,13),
        (3, "abcdef", 105, "PArul", 2, 7, 8, 7, 3,14),
        (1, "abcdefg", 106, "Vipul", 3, 4, 2, 8, 4,15)
      ).toDF("number", "word", "ID", "Name", "prd_1", "prd_2", "prd_3", "prd_4", "prd_5","prd_6")
    val myArray1 = Array("prd_1","prd_2","prd_3")
      val myArray2 = Array("prd_4", "prd_5","prd_6")
     val testDf = df2
        .select(
          col("number"), col("word"), col("ID"), col("Name"),
          explode(array(myArray1.head,myArray1.tail:_*)).as("col1"),
          col("prd_6"),col("prd_4"),col("prd_5")
        )

    )

Explode 函数在 select 语句中只工作一次,在连续的 select 语句中使用它会创建很多不必要的行,我想在单个 select 语句中使用 7-8 次explode。此外,我想合并到单个列中的列数将始终保持不变。

标签: apache-sparkdataframe

解决方案


另一种实现方式

val src_df=spark.read.option("header","true").csv("src file")
val mapped_df = src_df.groupBy(col("number"),col("word"),col("ID"),col("Name")).agg(collect_list(map($"prd_1",$"prd_4")) as "map_1",collect_list(map($"prd_2",$"prd_5")) as "map_2",collect_list(map($"prd_3",$"prd_6")) as "map_3")
def mergeUdf = udf((map1: Seq[Map[String, String]], map2: Seq[Map[String, String]],map3: Seq[Map[String, String]])=> map1.toList.flatten.toMap ++ map2.toList.flatten.toMap ++ map3.toList.flatten.toMap)
val new_df= mapped_df.withColumn("merged", mergeUdf(col("map_1"), col("map_2"),col("map_3"))).drop("map_1", "map_2","map_3")
new_df.select(col("number"),col("word"),col("ID"),col("Name"),explode($"merged")).show(false)

推荐阅读