apache-spark - 如何将行压缩为一个?
问题描述
环境:火花 2.4.5
来源:test.csv
id,date,item1,item2,item3
0,1,111,,
0,1,,222,
0,1,,,333
1,1,111,,
1,1,,222,
1,1,,,333
目标:test.csv
id,date,item1,item2,item3
0,1,111,222,333
1,1,111,222,333
如您所见,我想将具有相同id和date的行合并为一个。
我的解决方案:
我试过 arrays_zip 函数来处理它但失败了:
val soruce = spark.read("/home/user/test.csv").csv.options("header", "true")
spark.sql("SELECT id , date, arrays_zip( collect_list(item1), collect_list(item2), collect_list(item3)) FROM source GROUP BY id,date").show(false)
+---+----+-------------------------------------------------------------------------+
|id |date|arrays_zip(collect_list(item1), collect_list(item2), collect_list(item3))|
+---+----+-------------------------------------------------------------------------+
|0 |1 |[[111, 222, 333]] |
|1 |1 |[[111, 222, 333]] |
+---+----+-------------------------------------------------------------------------+
也许我应该把这个数组分解成列?
如果您能给我一些建议,我将不胜感激。
解决方案
使用flatten
andarray
而不是arrays_zip
then 使用element_at
函数从每个元素中获取项目。
val df = spark.read("/home/user/test.csv").csv.options("header", "true")
df.groupBy(col("id"),col("date")).
agg(flatten(array(collect_list(col("item1")),collect_list(col("item2")),collect_list(col("item3")))).alias("it")).
withColumn("item1",element_at(col("it"),1)).
withColumn("item2",element_at(col("it"),2)).
withColumn("item3",element_at(col("it"),3)).
drop("it").
show()
//+---+----+-----+-----+-----+
//| id|date|item1|item2|item3|
//+---+----+-----+-----+-----+
//| 0| 1| 111| 222| 333|
//| 1| 1| 111| 222| 333|
//+---+----+-----+-----+-----+
2.Using groupBy and first(col,ignoreNulls=true)
df.groupBy(col("id"),col("date")).
agg(first(col("item1")).alias("item1"),first(col("item2"),true).alias("item2"),first(col("item3"),true).alias("item3")).
show()
//+---+----+-----+-----+-----+
//| id|date|item1|item2|item3|
//+---+----+-----+-----+-----+
//| 0| 1| 111| 222| 333|
//| 1| 1| 111| 222| 333|
//+---+----+-----+-----+-----+
SQL:
df.createOrReplaceTempView("tmp")
//using first
spark.sql("select id,date,first(item1,true) as item1,first(item2,true) as item2,first(item3,true) as item3 from tmp group by id,date").show()
//using max
spark.sql("select id,date,max(item1) as item1,max(item2) as item2,max(item3) as item3 from tmp group by id,date").show()
//using flatten array
spark.sql("select id,date, element_at(tmp,1)item1, element_at(tmp,2)item2, element_at(tmp,3)item3 from (select id,date,flatten(array(collect_list(item1),collect_list(item2),collect_list(item3))) as tmp from tmp group by id,date)t").show()
//+---+----+-----+-----+-----+
//| id|date|item1|item2|item3|
//+---+----+-----+-----+-----+
//| 0| 1| 111| 222| 333|
//| 1| 1| 111| 222| 333|
//+---+----+-----+-----+-----+
Dynamic way:
val df = spark.read("/home/user/test.csv").csv.options("header", "true")
val df1=df.groupBy(col("id"),col("date")).agg(flatten(array(collect_list(col("item1")),collect_list(col("item2")),collect_list(col("item3")))).alias("it"))
val len=df1.agg(max(size(col("it")))).collect()(0)(0).toString.toInt
spark.range(len).collect().foldLeft(df1)((df,len) => df.withColumn(s"item${len+1}",col("it")(len))).
drop("it").
show()
//+---+----+-----+-----+-----+
//| id|date|item1|item2|item3|
//+---+----+-----+-----+-----+
//| 0| 1| 111| 222| 333|
//| 1| 1| 111| 222| 333|
//+---+----+-----+-----+-----+
推荐阅读
- c# - 如何关闭下拉列表所需的消息?
- javascript - javascript:array.length 评估为未定义
- html - 参数化 HTML 报告中的弹出窗口 RMarkdown
- node.js - 响应中的不稳定行为 express + next + 对并发请求做出反应
- reactjs - 如果我不使用它们,为什么它会在反应生命周期方法中出错?
- entity-framework - 更新 EF Core 上下文中的特定实体条目
- python - 错误实用程序:python 的线程标准输出编写器中未捕获的异常
- dynamic - 我可以在 robots.txt 中引用动态生成的站点地图吗?
- elixir - 虽然宏 bind_quoted 在第一次迭代时中断
- javascript - 为什么我使用 componentWillReceiveProps 收到此错误?