apache-spark - 如何根据键值对 RDD 中的值(元组)进行过滤
问题描述
示例 RDD 如下所示:
(key1,(111,222,1)
(key1,(113,224,1)
(key1,(114,225,0)
(key1,(115,226,0)
(key1,(113,226,0)
(key1,(116,227,1)
(key1,(117,228,1)
(key2,(118,229,1)
我目前正在做一个火花项目。'1'
我想根据键过滤元组值中第三个位置所在的第一个和最后一个元素'0'
。是否可以使用 reduceByKey 来做到这一点?但是经过我的研究,我没有找到一个很好的逻辑来达到我想要的。我希望我的结果与下面显示的输出顺序相同。
预期输出:
(key1,(111,222,1)
(key1,(114,225,0)
(key1,(113,226,0)
(key1,(116,227,1)
(key2,(118,229,1)
非常感激。
解决方案
如果我理解正确,您需要每个键的第一个“1”、第一个“0”、最后一个“1”和最后一个“0”,并保持顺序。如果我是你,我会使用 SparkSQL API 来做到这一点。
首先,让我们构建您的 RDD(顺便说一下,提供示例数据非常好,提供足够的代码以便我们可以更好地重现您所做的事情):
val seq = Seq(("key1",(111,222,1)),
("key1",(113,224,1)),
("key1",(114,225,0)),
("key1",(115,226,0)),
("key1",(113,226,0)),
("key1",(116,227,1)),
("key1",(117,228,1)),
("key2",(118,229,1)))
val rdd = sc.parallelize(seq)
// then I switch to dataframes, and add an id to be able to go back to
// the previous order
val df = rdd.toDF("key", "value").withColumn("id", monotonicallyIncreasingId)
df.show()
+----+-----------+------------+
| key| value| id|
+----+-----------+------------+
|key1|[111,222,1]| 8589934592|
|key1|[113,224,1]| 25769803776|
|key1|[114,225,0]| 42949672960|
|key1|[115,226,0]| 60129542144|
|key1|[113,226,0]| 77309411328|
|key1|[116,227,1]| 94489280512|
|key1|[117,228,1]|111669149696|
|key2|[118,229,1]|128849018880|
+----+-----------+------------+
现在,我们可以按“key”和“value._3”分组,保留 min(id) 和它的 max 并分解数据。然而,有了一个窗口,我们可以用更简单的方式来做到这一点。让我们定义以下窗口:
val win = Window.partitionBy("key", "value._3").orderBy("id")
// now we compute the previous and next element of each id using resp. lag and lead
val big_df = df
.withColumn("lag", lag('id, 1) over win)
.withColumn("lead", lead('id, 1) over win)
big_df.show
+----+-----------+------------+-----------+------------+
| key| value| id| lag| lead|
+----+-----------+------------+-----------+------------+
|key1|[111,222,1]| 8589934592| null| 25769803776|
|key1|[113,224,1]| 25769803776| 8589934592| 94489280512|
|key1|[116,227,1]| 94489280512|25769803776|111669149696|
|key1|[117,228,1]|111669149696|94489280512| null|
|key1|[114,225,0]| 42949672960| null| 60129542144|
|key1|[115,226,0]| 60129542144|42949672960| 77309411328|
|key1|[113,226,0]| 77309411328|60129542144| null|
|key2|[118,229,1]|128849018880| null| null|
+----+-----------+------------+-----------+------------+
现在我们看到您所追求的行是滞后等于 null(第一个元素)或前导等于 null(最后一个元素)的行。因此,让我们过滤,使用 id 排序回上一个顺序并选择您需要的列:
val result = big_df
.where(('lag isNull) || ('lead isNull))
.orderBy('id)
.select("key", "value")
result.show
+----+-----------+
| key| value|
+----+-----------+
|key1|[111,222,1]|
|key1|[114,225,0]|
|key1|[113,226,0]|
|key1|[117,228,1]|
|key2|[118,229,1]|
+----+-----------+
最后,如果你真的需要一个 RDD,你可以使用以下方式转换数据帧:
result.rdd.map(row => row.getAs[String](0) -> row.getAs[(Int, Int, Int)](1))
推荐阅读
- powershell - 在 Windows Docker 容器中运行 Visual Studio C++ 2017 编译的可执行文件
- python - KeyError: 'image_url' Traceback (last recent call last) 烧瓶 api 响应错误
- android - 如何检查和使用 Google Play 服务 更新的安全性 在反应原生应用程序中提供以保护应用程序免受 OpenSSL 漏洞的影响
- apache-kafka - Nest.JS CQRS:有没有办法通过在后台使用 Kafka 或 RabbitMQ 使命令总线和队列在外部?谢谢
- mdx - MDX - 显示此 MDX 查询的维度标签/值
- css - Bootstrap(在 Laravel 上) - 标题下拉列表在一页上变得透明
- c - 将 elf32_Word 打印为单词
- java - secLdap 插件发生内部错误。无效的
- angular - 带有管道的 ngFor 标签重复多少次
- vue.js - 使用 Axios 调用后 Cookie 未存储在浏览器中