apache-spark - 使用 pyspark 的 Spark RDD 窗口化
问题描述
有一个 Spark RDD,称为rdd1
. 它有(key, value)
对,我有一个列表,其元素是 a tuple(key1,key2)
。
我想得到一个rdd2
,行`((key1,key2),(rdd1中key1的值,rdd1中key2的值))。
有人可以帮助我吗?
rdd1:
key1, value1,
key2, value2,
key3, value3
大批:[(key1,key2),(key2,key3)]
结果:
(key1,key2),value1,value2
(key2,key3),value2,value3
我努力了
spark.parallize(array).map(lambda x:)
解决方案
使用 SCALA 滑动与 mllib 滑动 - 两种实现,有点繁琐,但这里是:
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd1 = sc.parallelize(Seq(
( "key1", "value1"),
( "key2", "value2"),
( "key3", "value3"),
( "key4", "value4"),
( "key5", "value5")
))
val rdd2 = rdd1.sliding(2)
val rdd3 = rdd2.map(x => (x(0), x(1)))
val rdd4 = rdd3.map(x => ((x._1._1, x._2._1),x._1._2, x._2._2))
rdd4.collect
此外,以下内容当然更好……:
val rdd5 = rdd2.map{case Array(x,y) => ((x._1, y._1), x._2, y._2)}
rdd5.collect
在两种情况下都返回:
res70: Array[((String, String), String, String)] = Array(((key1,key2),value1,value2), ((key2,key3),value2,value3), ((key3,key4),value3,value4), ((key4,key5),value4,value5))
我相信这可以满足您的需求,但不能满足您的需求。
在 Stack Overflow 上,您可以找到 pyspark 没有 RDD 等效项的声明,除非您“自己动手”。您可以在 Pyspark 中查看如何在时间序列数据上使用滑动窗口转换数据。但是,我建议使用 pyspark.sql.functions.lead() 和 pyspark.sql.functions.lag() 来构建数据框。容易一些。
推荐阅读
- go - Google oauth2 端点不返回用户个人资料信息(名称等)
- node.js - 前端和后端有两个不同的端口?(使用 Strapi) - nginx 的配置
- c# - 如何在 C# 中忽略特定代码行的声纳规则?
- php - Themosis 框架 wordpress - 默认页面提供 404 状态
- android - 在显示区域外加载谷歌地图
- microsoft-graph-api - Microsoft Graph:向用户收件箱插入消息(邮件)
- javascript - 如何将数据从子组件(子组件有自己的状态)传递给父组件?
- java - 像素周围的点云(不是点或平面)上的 HitTest-ing
- vba - 添加行,然后用集合中的数据填充这些行
- laravel - 带有 csrf 的 laravel GuzzleHttp 帖子