apache-spark - 使用 Spark DataFrame 转置表
问题描述
以下是我的输入DataFrame
:
+------+-------+---+------+-----+-----+-----+-----+-----+-----+
|number|word |ID |Name |prd_1|prd_2|prd_3|prd_4|prd_5|prd_6|
+------+-------+---+------+-----+-----+-----+-----+-----+-----+
|1 |bat |101|Naman |2 |3 |8 |4 |5 |10 |
|2 |abc |102|Bhagat|3 |8 |7 |9 |8 |11 |
|3 |abcd |103|Anchal|1 |9 |2 |3 |6 |12 |
|4 |abcde |104|Dev |8 |6 |9 |4 |5 |13 |
|3 |abcdef |105|PArul |2 |7 |8 |7 |3 |14 |
|1 |abcdefg|106|Vipul |3 |4 |2 |8 |4 |15 |
+------+-------+---+------+-----+-----+-----+-----+-----+-----+
现在我想将prd_1
, prd_2
,prd_3
转换成单列col1
, prd_4
, prd_5
,prd_6
到col2
. 以下是我正在寻找
的预期输出:DataFrame
number|word |ID |Name |col1|col2|
+------+-------+---+------+----+----+
|1 |bat |101|Naman |2 |4 |
|1 |bat |101|Naman |3 |5 |
|1 |bat |101|Naman |8 |10 |
|2 |abc |102|Bhagat|3 |9 |
|2 |abc |102|Bhagat|8 |8 |
|2 |abc |102|Bhagat|7 |11 |
|3 |abcd |103|Anchal|1 |3 |
|3 |abcd |103|Anchal|9 |6 |
|3 |abcd |103|Anchal|2 |12 |
这是我尝试过的:
val df2 = Seq(
(1, "bat", 101, "Naman", 2, 3, 8, 4, 5,10),
(2, "abc", 102, "Bhagat", 3, 8, 7, 9, 8,11),
(3, "abcd", 103, "Anchal", 1, 9, 2, 3, 6,12),
(4, "abcde", 104, "Dev", 8, 6, 9, 4, 5,13),
(3, "abcdef", 105, "PArul", 2, 7, 8, 7, 3,14),
(1, "abcdefg", 106, "Vipul", 3, 4, 2, 8, 4,15)
).toDF("number", "word", "ID", "Name", "prd_1", "prd_2", "prd_3", "prd_4", "prd_5","prd_6")
val myArray1 = Array("prd_1","prd_2","prd_3")
val myArray2 = Array("prd_4", "prd_5","prd_6")
val testDf = df2
.select(
col("number"), col("word"), col("ID"), col("Name"),
explode(array(myArray1.head,myArray1.tail:_*)).as("col1"),
col("prd_6"),col("prd_4"),col("prd_5")
)
)
Explode 函数在 select 语句中只工作一次,在连续的 select 语句中使用它会创建很多不必要的行,我想在单个 select 语句中使用 7-8 次explode。此外,我想合并到单个列中的列数将始终保持不变。
解决方案
另一种实现方式
val src_df=spark.read.option("header","true").csv("src file")
val mapped_df = src_df.groupBy(col("number"),col("word"),col("ID"),col("Name")).agg(collect_list(map($"prd_1",$"prd_4")) as "map_1",collect_list(map($"prd_2",$"prd_5")) as "map_2",collect_list(map($"prd_3",$"prd_6")) as "map_3")
def mergeUdf = udf((map1: Seq[Map[String, String]], map2: Seq[Map[String, String]],map3: Seq[Map[String, String]])=> map1.toList.flatten.toMap ++ map2.toList.flatten.toMap ++ map3.toList.flatten.toMap)
val new_df= mapped_df.withColumn("merged", mergeUdf(col("map_1"), col("map_2"),col("map_3"))).drop("map_1", "map_2","map_3")
new_df.select(col("number"),col("word"),col("ID"),col("Name"),explode($"merged")).show(false)
推荐阅读
- authentication - 代表用户从客户端应用程序的 Azure 函数与 Azure 服务交互(用户模拟)
- sql - SQL - 创建一个查询,显示在特定时间范围内登录最多的用户
- python - 熊猫在入口附近合并
- json - 在嵌套数组中查询具有重复 id 的对象的 mongodb 文档
- firefox - 如何解决这个firefox问题:没有连接?
- javascript - 从循环中设置状态
- reactjs - 在 Karma 测试中找不到函数组件中的 ReactJS 静态道具
- statistics - 在 python 中约束 VAR 模型
- python - 将字典列表的 Python 数据框列转换为具有单个元素的列
- docker - Docker compose:[Errno 111] 连接被拒绝 - 多容器应用程序中的网络失败