scala - 时间序列数据火花的数据处理
问题描述
鉴于以下样本数据,
t- timeseries datetime sample, lat-latitude, long-longitude
t lat long
0 27 28
5 27 28
10 27 28
15 29 49
20 29 49
25 27 28
30 27 28
我想获得与此类似的输出,我想以这样一种方式处理时间序列数据,即对 lat long 进行分组,我能够获得该对的不同时间序列间隔。我正在火花中进行处理
Lat-long interval
(27,28) (0,10)
(29,49) (15,20)
(27,28) (25,30)
解决方案
如果您的数据很大,我不会建议您使用此解决方案,但是既然您发表了评论
我正在处理存储在 cassandara 中的每日数据,大小为 5-6k 行记录/秒
以下解决方案建议应该没问题
查看您给定的数据框,架构应为
root
|-- t: integer (nullable = false)
|-- lat: integer (nullable = false)
|-- long: integer (nullable = false)
您的预期输出表明您需要一个额外的列来对数据框进行分组,这需要您在一个执行程序上收集数据
val collectedRDD = df.collect()
var varianceCount, lattitude, longitude = 0
val groupedData = new ArrayBuffer[(Int, Int, Int, Int)]()
for(rdd <- collectedRDD) {
val t = rdd.getAs[Int]("t")
val lat = rdd.getAs[Int]("lat")
val long = rdd.getAs[Int]("long")
if (lat != lattitude || long != longitude) {
varianceCount = varianceCount + 1
lattitude = lat
longitude = long
groupedData.append((t, lat, long, varianceCount))
}
else {
groupedData.append((t, lat, long, varianceCount))
}
}
然后将 ArrayBuffer 转换为数据帧并使用groupBy
andaggregation
作为
val finalDF = groupedData
.toDF("t", "lat", "long", "grouped")
.groupBy(struct("lat", "long").as("lat-long"), col("grouped"))
.agg(struct(min("t"), max("t")).as("interval"))
.drop("grouped")
finalDF
应该
+--------+--------+
|lat-long|interval|
+--------+--------+
|[29,49] |[15,20] |
|[27,28] |[0,10] |
|[27,28] |[25,30] |
+--------+--------+
我希望答案有帮助
推荐阅读
- c - 如何获取命令行参数以从同一 c 程序文件中的另一个方法读取输出?
- unity3d - 为什么 Unity 在构建中包含编辑器目录
- html - 如何使用纯 CSS 在一个 div 中订购(两列)长文本?
- javascript - 无法使用 MySQL 在 node.js 中执行 Sql 查询
- powerbi - 日期PowerBI的累计发生次数?
- oracle - 如何解决这个关于 Oracle 触发器的练习
- javascript - Javascript 从此数组中追加对象并创建带有对象键值对的新数组
- javascript - 无法提交包含多个文件和输入的表单
- android - 我正在尝试为我的应用程序创建启动画面,但我收到 AAPT: error: not well-formed error
- node.js - 如何从 GET 请求中获取 pdf 文件并从中获取文本?