apache-spark - Spark - 如何通过具有或关系的多个键对数据进行分组
问题描述
我有一个如下的RDD:
[
Row(device='1', token='aaa', other_value=1),
Row(device='1', token='bbb', other_value=1),
Row(device='2', token='bbb', other_value=1),
Row(device='4', token='ddd', other_value=1),
...
]
为了进行一些处理(而不是聚合),我需要将具有相同设备或令牌的行组合在一起。例如,上面的前 3 行应组合在一起以进行进一步处理。(第 1 行和第 2 行具有相同的设备 1,第 2 和第 3 行具有相同的令牌 bbb)。
本质上,我想做类似的事情
rdd.groupBy(same device or same token)
火花中可以吗?
更新:
对于上面的示例,预期的输出将是这样的 RDD:
[
[
Row(device='1', token='aaa', other_value=1),
Row(device='1', token='bbb', other_value=1),
Row(device='2', token='bbb', other_value=1)
],
[
Row(device='4', token='ddd', other_value=1)
]
]
解决方案
行所属的组的属性是可传递的。因此,简单的groupBy
行不通。一种选择是将数据解释为图形并使用GraphFrames来查找图形中的连接组件。输入数据的每一行都将是源 =device
和目标 =的图的一条边token
。
#setup GraphFrames
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"
)
spark = ...
#the input data
rdd = spark.sparkContext.parallelize([ Row(device='1', token='aaa', other_value=1),
Row(device='1', token='bbb', other_value=1),
Row(device='2', token='bbb', other_value=1),
Row(device='4', token='ddd', other_value=1)])
df = spark.createDataFrame(rdd)
edges = df \
.withColumnRenamed("device", "src") \
.withColumnRenamed("token", "dst")
vertices = edges.select("src").distinct() \
.union(edges.select("dst").distinct()) \
.withColumnRenamed("src", "id")
#create a graph and find all connected components
g = GraphFrame(vertices, edges)
cc = g.connectedComponents()
df.join(cc.distinct(), df.device == cc.id) \
.orderBy("component", "device", "token") \
.show()
输出:
+------+-----+-----------+---+------------+
|device|token|other_value| id| component|
+------+-----+-----------+---+------------+
| 4| ddd| 1| 4| 60129542144|
| 1| aaa| 1| 1|335007449088|
| 1| bbb| 1| 1|335007449088|
| 2| bbb| 1| 2|335007449088|
+------+-----+-----------+---+------------+
具有相同值的所有行component
属于同一组。
推荐阅读
- java - int low 和 int high 之间的数字之和;爪哇
- javascript - 是否可以使用 Google Apps 脚本制作一个简单的 Discord 机器人?
- wix - WiX 工具集 - 为什么在 InstallFinalize 之后立即执行 CustomAction 的 CustomActionData 集合为空?
- ios - 为 iPhone X 调整图像时,裁剪区域有多长?
- c# - 如何配置 WPF 项目以使用 BLE?
- sml - 标准 ML 展开列表
- python - How to spawn Python process from Heroku Node
- c - 如何将字符串数组的元素存储到C中的结构中?
- aws-api-gateway - API-Gateway Auth:AWS sigv4 与 Cognito 用户池 JWT
- java - Java swing:对话框内的格式内容