首页 > 解决方案 > PySpark:GroupByKey 并获取元组的总和

问题描述

我有这组数据:

[('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]

我的目标是,对于每个行政区,我想获得总和最高的三个街区。

这个数据的格式是

X = (Borough, (Neighborhood, total))

我在这里的思考过程是:

我想在这个数据上做一个 groupbykey 我将首先得到所有三个行政区,然后是三个最高的社区,所以代码:

X.groupByKey().mapValues(sum).collect()

但是,据我了解,这会出错,因为第二个元素又是一个元组,我想访问第二个元组的第二个元素,我不知道该怎么做。

此外,通过这种方式,我只需汇总数据,因此我编写了这段代码,它将为我提供三个最高的邻域:

def findingLargest(item):
    from heapq import nlargest
    i, j = item
    tops = nlargest(3, j,key=lambda x: x[1])
    return (i, tops)

所以,我能想出的最终代码是:

X.groupByKey()\
 .map(findingLargest)

预期输出:

Borough, Top_1 Neighborhood, Top_1_count, Top_2 Neighborhood, Top_2_count

关于如何进行此操作的任何建议?

标签: pythonapache-sparkpyspark

解决方案


我有一个解决方案,但它需要一次从rdd使用DataFrame. 最直接的实现是直接使用DataFrame

data = sc.parallelize([('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))])

将您的 rdd 转换为 (key1_key2, value) 格式:

data = data.map(lambda l: (l[0] + "_" + l[1][0], l[1][1]))
data.take(2)
# [('Manhattan_East Village', 2), ('Manhattan_Theater District', 2)]

然后聚合:

data = data.reduceByKey(lambda x,y:x+y)
data.take(2)
# [('Manhattan_Theater District', 2), ('Queens_John F. Kennedy International Airport', 2)]

拆分得到 (key1, key2, value) 格式:

data2 = data.map(lambda l: (l[0].split("_"), l[1]))
data2 = data2.map(lambda l: (l[0][0], l[0][1], l[1]))
data2.take(2)
# [('Manhattan', 'Theater District', 2), ('Queens', 'John F. Kennedy International Airport', 2)]

使用 API 选择前 n 个特性会更容易DataFrame(事实上,第一部分会更容易)。我使用一个window函数:

df = data2.toDF(['district','neighbor','count'])
import pyspark.sql.functions as psf
import pyspark.sql.window as psw

w = psw.Window.partitionBy('district').orderBy(psf.desc('count'))
df = (df.select(psf.col('*'), psf.row_number().over(w).alias('row_number'))
      .where(psf.col('row_number') <= 3)
     )
df.show(10)
+---------+--------------------+-----+----------+
| district|            neighbor|count|row_number|
+---------+--------------------+-----+----------+
|   Queens|John F. Kennedy I...|    2|         1|
|   Queens|   LaGuardia Airport|    2|         2|
|   Queens|           Sunnyside|    2|         3|
| Brooklyn|    Brooklyn Heights|    2|         1|
|Manhattan|    Theater District|    2|         1|
|Manhattan|           Chinatown|    2|         2|
|Manhattan|         Murray Hill|    2|         3|
+---------+--------------------+-----+----------+

要最终获得所需的输出,一种方法是切换回rdd

df.rdd.map(lambda l: (l[0], (l[1], l[2]))).reduceByKey(lambda x,y: x + y).take(2)
# [('Manhattan', ('Theater District', 2, 'Chinatown', 2, 'Murray Hill', 2)),
 ('Brooklyn', ('Brooklyn Heights', 2))]

推荐阅读