loops - PySpark 地理位置将重复行排序到列枢轴
问题描述
我有一个名为“df”的 spark 数据框/表,其中包含城市和邮政编码组合以及每个城市/邮政编码的地理坐标。该表还包含几个商店的地理坐标,例如剧院、酒吧、汽车店。该表对于每个商店的每个城市和邮政编码组合都有数百个重复行,我使用haversine公式计算了每个城市和邮政编码与每个商店位置的距离(以公里为单位)。这是表格的摘录(我将示例限制为仅 3 个城市和邮政编码):
CITY_POSTAL latitude_CITY_POSTAL longitude_CITY_POSTAL location latitude_location longitude_location distance
0 1 7.1899 52.208 theater 36.8381 -2.4597 6416.753469
1 1 7.1899 52.208 bar 41.6561 -0.8773 6460.611645
2 1 7.1899 52.208 car_shop 37.2829 -5.9209 6725.829125
3 2 -5.9209 37.4827 theater 36.8381 -2.4597 6308.847913
4 2 -5.9209 37.4827 bar 41.6561 -0.8773 6566.958894
5 2 -5.9209 37.4827 car_shop 37.2829 -5.9209 6579.375371
6 3 83.1072 54.849 theater 36.8381 -2.4597 5523.801936
7 3 83.1072 54.849 bar 41.6561 -0.8773 4974.492016
8 3 83.1072 54.849 car_shop 37.2829 -5.9209 5516.266902
我现在想对表格进行分组并旋转表格,以便每个唯一的城市和邮政编码都有 n 个从最近到最远的商店排列的列,还包括每个商店距离的列。我使用 pandas 语法完成了这项工作,如下所示:
# Use a Pivot Table to go from long to wide format
df = (
df.pivot_table(index='CITY_POSTAL',
columns=(
# Create Groups based on Sorted Distance
df3.sort_values('distance', ascending=True)
.groupby('CITY_POSTAL').cumcount() + 1
),
values=['location', 'distance'],
aggfunc='first')
.sort_index(level=[1, 0], axis=1, ascending=(True, False))
)
# Collapse MultiIndex
df.columns = df.columns.map(lambda x: '_'.join(map(str, x)))
df = df.reset_index()
这产生了我感兴趣的表格:
CITY_POSTAL location_1 distance_1 location_2 distance_2 location_3 distance_3
0 1 theater 6416.753469 bar 6460.611645 car_shop 6725.829125
1 2 theater 6308.847913 bar 6566.958894 car_shop 6579.375371
2 3 bar 4974.492016 car_shop 5516.266902 theater 5523.801936
我不知道如何在 PySpark 中写这个。有人可以帮忙翻译一下吗?
解决方案
您可以按 对数据框进行分组CITY_POSTAL
,将所有位置和距离收集到一个数组中,按距离对数组进行排序,然后选择数组的前 N 个元素:
df.groupBy("CITY_POSTAL").agg(F.collect_list(F.struct("location", "distance")).alias("dist")) \
.withColumn("dist", F.expr("array_sort(dist, (l,r) -> if(l['distance'] < r['distance'],-1,1))")) \
.selectExpr("CITY_POSTAL",
"dist[0]['location'] as locaction_1",
"dist[0]['distance'] as distance_1",
"dist[1]['location'] as locaction_2",
"dist[1]['distance'] as distance_2",
"dist[2]['location'] as locaction_3",
"dist[2]['distance'] as distance_3") \
.show(truncate=False)
输出:
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|CITY_POSTAL|locaction_1|distance_1 |locaction_2|distance_2 |locaction_3|distance_3 |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|1 |theater |6416.753469|bar |6460.611645|car_shop |6725.829125|
|3 |bar |4974.492016|car_shop |5516.266902|theater |5523.801936|
|2 |theater |6308.847913|bar |6566.958894|car_shop |6579.375371|
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
推荐阅读
- sql - 如何在 WHERE 子句中使用别名?
- c# - asp.net 连接到外部数据库
- javascript - 无法将 CanvasJS 导入 Java 项目,无法解析符号 canvasjs
- python - 如何使用 Kivy 制作的应用程序无线控制 LED?
- php - Eloquent 多对多关系上的总和数据透视表字段
- java - MultiValueMap 获取值
- android-studio - 神器发布不适用于 android studio 3.4.1
- python - 使用外壳扩展将图标插入上下文菜单
- java - 如何使用 REST Api,然后将热门故事显示为 JSON
- tooltip - 如何在滚动时设置工具提示隐藏