python - PySpark:GroupByKey 并获取元组的总和
问题描述
我有这组数据:
[('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]
我的目标是,对于每个行政区,我想获得总和最高的三个街区。
这个数据的格式是
X = (Borough, (Neighborhood, total))
我在这里的思考过程是:
我想在这个数据上做一个 groupbykey 我将首先得到所有三个行政区,然后是三个最高的社区,所以代码:
X.groupByKey().mapValues(sum).collect()
但是,据我了解,这会出错,因为第二个元素又是一个元组,我想访问第二个元组的第二个元素,我不知道该怎么做。
此外,通过这种方式,我只需汇总数据,因此我编写了这段代码,它将为我提供三个最高的邻域:
def findingLargest(item):
from heapq import nlargest
i, j = item
tops = nlargest(3, j,key=lambda x: x[1])
return (i, tops)
所以,我能想出的最终代码是:
X.groupByKey()\
.map(findingLargest)
预期输出:
Borough, Top_1 Neighborhood, Top_1_count, Top_2 Neighborhood, Top_2_count
关于如何进行此操作的任何建议?
解决方案
我有一个解决方案,但它需要一次从rdd
使用DataFrame
. 最直接的实现是直接使用DataFrame
data = sc.parallelize([('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))])
将您的 rdd 转换为 (key1_key2, value) 格式:
data = data.map(lambda l: (l[0] + "_" + l[1][0], l[1][1]))
data.take(2)
# [('Manhattan_East Village', 2), ('Manhattan_Theater District', 2)]
然后聚合:
data = data.reduceByKey(lambda x,y:x+y)
data.take(2)
# [('Manhattan_Theater District', 2), ('Queens_John F. Kennedy International Airport', 2)]
拆分得到 (key1, key2, value) 格式:
data2 = data.map(lambda l: (l[0].split("_"), l[1]))
data2 = data2.map(lambda l: (l[0][0], l[0][1], l[1]))
data2.take(2)
# [('Manhattan', 'Theater District', 2), ('Queens', 'John F. Kennedy International Airport', 2)]
使用 API 选择前 n 个特性会更容易DataFrame
(事实上,第一部分会更容易)。我使用一个window
函数:
df = data2.toDF(['district','neighbor','count'])
import pyspark.sql.functions as psf
import pyspark.sql.window as psw
w = psw.Window.partitionBy('district').orderBy(psf.desc('count'))
df = (df.select(psf.col('*'), psf.row_number().over(w).alias('row_number'))
.where(psf.col('row_number') <= 3)
)
df.show(10)
+---------+--------------------+-----+----------+
| district| neighbor|count|row_number|
+---------+--------------------+-----+----------+
| Queens|John F. Kennedy I...| 2| 1|
| Queens| LaGuardia Airport| 2| 2|
| Queens| Sunnyside| 2| 3|
| Brooklyn| Brooklyn Heights| 2| 1|
|Manhattan| Theater District| 2| 1|
|Manhattan| Chinatown| 2| 2|
|Manhattan| Murray Hill| 2| 3|
+---------+--------------------+-----+----------+
要最终获得所需的输出,一种方法是切换回rdd
:
df.rdd.map(lambda l: (l[0], (l[1], l[2]))).reduceByKey(lambda x,y: x + y).take(2)
# [('Manhattan', ('Theater District', 2, 'Chinatown', 2, 'Murray Hill', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]
推荐阅读
- jekyll - Jekyll + Jemoji:如何配置表情符号大小?
- css - 如何在 Flutter 中使用一些 CSS 元素
- cryptography - 对于不同的密钥和消息对,HMAC 输出是否相同
- apache-spark - 在通过 spark 结构化流写入文件时读取文件
- json - jq - 如何在单个数组中选择嵌套对象
- azure - Azure 事件网格:查明是否正在覆盖 blob
- jquery - XMLHTTP 请求不解析 XML 文件
- assembly - 为 8086 组装游戏添加背景音乐
- postgresql - 无法从另一个容器连接到 Postgres 容器
- javascript - 拖放后无法访问元素