首页 > 解决方案 > PySpark 地理位置将重复行排序到列枢轴

问题描述

我有一个名为“df”的 spark 数据框/表,其中包含城市和邮政编码组合以及每个城市/邮政编码的地理坐标。该表还包含几个商店的地理坐标,例如剧院、酒吧、汽车店。该表对于每个商店的每个城市和邮政编码组合都有数百个重复行,我使用haversine公式计算了每个城市和邮政编码与每个商店位置的距离(以公里为单位)。这是表格的摘录(我将示例限制为仅 3 个城市和邮政编码):

CITY_POSTAL latitude_CITY_POSTAL    longitude_CITY_POSTAL   location    latitude_location   longitude_location      distance
0         1               7.1899                   52.208    theater              36.8381              -2.4597   6416.753469
1         1               7.1899                   52.208        bar              41.6561              -0.8773   6460.611645
2         1               7.1899                   52.208   car_shop              37.2829              -5.9209   6725.829125
3         2              -5.9209                  37.4827    theater              36.8381              -2.4597   6308.847913
4         2              -5.9209                  37.4827        bar              41.6561              -0.8773   6566.958894
5         2              -5.9209                  37.4827   car_shop              37.2829              -5.9209   6579.375371
6         3              83.1072                   54.849    theater              36.8381              -2.4597   5523.801936
7         3              83.1072                   54.849        bar              41.6561              -0.8773   4974.492016
8         3              83.1072                   54.849   car_shop              37.2829              -5.9209   5516.266902

我现在想对表格进行分组并旋转表格,以便每个唯一的城市和邮政编码都有 n 个从最近到最远的商店排列的列,还包括每个商店距离的列。我使用 pandas 语法完成了这项工作,如下所示:

# Use a Pivot Table to go from long to wide format
df = (
    df.pivot_table(index='CITY_POSTAL',
                    columns=(
                        # Create Groups based on Sorted Distance
                            df3.sort_values('distance', ascending=True)
                            .groupby('CITY_POSTAL').cumcount() + 1
                    ),
                    values=['location', 'distance'],
                    aggfunc='first')
        .sort_index(level=[1, 0], axis=1, ascending=(True, False))
)

# Collapse MultiIndex
df.columns = df.columns.map(lambda x: '_'.join(map(str, x)))
df = df.reset_index()

这产生了我感兴趣的表格:

   CITY_POSTAL location_1   distance_1 location_2   distance_2 location_3   distance_3
0            1    theater  6416.753469        bar  6460.611645   car_shop  6725.829125
1            2    theater  6308.847913        bar  6566.958894   car_shop  6579.375371
2            3        bar  4974.492016   car_shop  5516.266902    theater  5523.801936

我不知道如何在 PySpark 中写这个。有人可以帮忙翻译一下吗?

标签: loopspysparkpivotpandas-groupbyrank

解决方案


您可以按 对数据框进行分组CITY_POSTAL,将所有位置和距离收集到一个数组中,按距离对数组进行排序,然后选择数组的前 N ​​个元素:

df.groupBy("CITY_POSTAL").agg(F.collect_list(F.struct("location", "distance")).alias("dist")) \
        .withColumn("dist", F.expr("array_sort(dist, (l,r) -> if(l['distance'] < r['distance'],-1,1))")) \
        .selectExpr("CITY_POSTAL", 
                "dist[0]['location'] as locaction_1",
                "dist[0]['distance'] as distance_1",
                "dist[1]['location'] as locaction_2",
                "dist[1]['distance'] as distance_2",
                "dist[2]['location'] as locaction_3",
                "dist[2]['distance'] as distance_3") \
        .show(truncate=False)

输出:

+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|CITY_POSTAL|locaction_1|distance_1 |locaction_2|distance_2 |locaction_3|distance_3 |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|1          |theater    |6416.753469|bar        |6460.611645|car_shop   |6725.829125|
|3          |bar        |4974.492016|car_shop   |5516.266902|theater    |5523.801936|
|2          |theater    |6308.847913|bar        |6566.958894|car_shop   |6579.375371|
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+

推荐阅读