apache-spark - 在 pyspark 中旋转 ArrayType 列
问题描述
我有一个具有以下架构的 pyspark 数据框
+----------+-------------------+-----------------------------------+------------------+
| date| numeric_id| feature_column| city|
+----------+-------------------+-----------------------------------+------------------+
|2017-08-01| 2343434545| [0.0, 0.0, 0.0, 0...| Berlin|
|2017-08-01| 2343434545| [0.0, 0.0, 0.0, 0...| Rome|
|2017-08-01| 2343434545| [0.0, 0.0, 0.0, 0...| NewYork|
|2017-08-01| 2343434545| [0.0, 0.0, 0.0, 0...| Beijing|
|2019-12-01| 6455534545| [0.0, 0.0, 0.0, 0...| Berlin|
|2019-12-01| 6455534545| [0.0, 0.0, 0.0, 0...| Rome|
|2019-12-01| 6455534545| [0.0, 0.0, 0.0, 0...| NewYork|
|2019-12-01| 6455534545| [0.0, 0.0, 0.0, 0...| Beijing|
+----------+-------------------+-----------------------------------+------------------+
我想旋转数据框,以便我可以将每个feature_column
xcity
作为一个新列,按date
和分组numeric_id
。输出数据框应如下所示
+----------+-------------+----------------------+--------------------+-----------------------+----------------------+
| date| numeric_id| feature_column_Berlin| feature_column_Rome| feature_column_NewYork|feature_column_Beijing|
+----------+-------------+----------------------+--------------------+-----------------------+----------------------+
|2017-08-01| 2343434545| [0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0... |[0.0, 0.0, 0.0, 0... |
|2019-12-01| 6455534545| [0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0... |[0.0, 0.0, 0.0, 0... |
+----------+-------------+----------------------+--------------------+-----------------------+----------------------+
这与发布在Pyspark Dataframe 上的旋转字符串 Pivot String 列上的问题不同,因为我正在处理 ArrayType 列。我认为在 Pandas 中实现它会更容易(但处理 ArrayType 列会很棘手),所以很好奇如何使用 spark SQL 来实现它。有什么建议么?
解决方案
//Initially I am creating the sample data to load the data in dataframe.
import org.apache.spark.sql.functions._
val df = Seq(("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Berlin"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Rome"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"NewYork"),("2017-08-01","2343434545",Array("0.0","0.0","0.0","0.0"),"Beijing"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Berlin"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Rome"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"NewYork"),("2019-12-01","6455534545",Array("0.0","0.0","0.0","0.0"),"Beijing"))
.toDF("date","numeric_id","feature_column","city")
df.groupBy("date","numeric_id").pivot("city")
.agg(collect_list("feature_column"))
.withColumnRenamed("Beijing","feature_column_Beijing")
.withColumnRenamed("Berlin","feature_column_Berlin")
.withColumnRenamed("NewYork","feature_column_NewYork")
.withColumnRenamed("Rome","feature_column_Rome").show()
您可以看到如下输出:
推荐阅读
- android - 如何使用Android FusedLocationProviderClient根据距离进行更新
- c - 为什么我收到 realloc(): invalid next size 错误?
- python - 使用 pandas.dataframe.values 将熊猫数据框转换为 numpy 数组
- python - 如何获取由列表 Python 中的项目组成的整数中的前导零
- rust - “预期结构 `std::rc::Rc`,找到参考” - 如何转换?
- java - 如何从 IntelliJ 生成的 jar 开始上课?
- javascript - 当前与 JS ES6 的活动链接
- sql - Rails Postgres 查询,仅选择出现在所有搜索参数中的具有关联的项目
- python - ImportError:没有名为 win32api 的模块错误
- django - 如何保存表单信息以使其显示在管理站点面板中?