首页 > 解决方案 > 时间序列数据火花的数据处理

问题描述

鉴于以下样本数据,

t- timeseries datetime sample,
lat-latitude,
long-longitude
t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28
30  27  28

我想获得与此类似的输出,我想以这样一种方式处理时间序列数据,即对 lat long 进行分组,我能够获得该对的不同时间序列间隔。我正在火花中进行处理

Lat-long    interval
(27,28) (0,10)
(29,49) (15,20)
(27,28) (25,30)

标签: scalaapache-sparktime-series

解决方案


如果您的数据很大,我不会建议您使用此解决方案,但是既然您发表了评论

我正在处理存储在 cassandara 中的每日数据,大小为 5-6k 行记录/秒

以下解决方案建议应该没问题

查看您给定的数据框架构应为

root
 |-- t: integer (nullable = false)
 |-- lat: integer (nullable = false)
 |-- long: integer (nullable = false)

您的预期输出表明您需要一个额外的列来对数据框进行分组,这需要您在一个执行程序上收集数据

val collectedRDD = df.collect()

var varianceCount, lattitude, longitude = 0
val groupedData = new ArrayBuffer[(Int, Int, Int, Int)]()
for(rdd <- collectedRDD) {
  val t = rdd.getAs[Int]("t")
  val lat = rdd.getAs[Int]("lat")
  val long = rdd.getAs[Int]("long")
  if (lat != lattitude || long != longitude) {
    varianceCount = varianceCount + 1
    lattitude = lat
    longitude = long
    groupedData.append((t, lat, long, varianceCount))
  }
  else {
    groupedData.append((t, lat, long, varianceCount))
  }
}

然后将 ArrayBuffer 转换为数据帧并使用groupByandaggregation作为

val finalDF = groupedData
  .toDF("t", "lat", "long", "grouped")
      .groupBy(struct("lat", "long").as("lat-long"), col("grouped"))
      .agg(struct(min("t"), max("t")).as("interval"))
    .drop("grouped")

finalDF应该

+--------+--------+
|lat-long|interval|
+--------+--------+
|[29,49] |[15,20] |
|[27,28] |[0,10]  |
|[27,28] |[25,30] |
+--------+--------+

我希望答案有帮助


推荐阅读