首页 > 解决方案 > 优化旋转和填充

问题描述

他们给了我一个存储传感器读数的表格,其中包含一个模式[TimeStamp, SensorKey, SensorValue]

TimeStamp             Id           Value
2019-01-01 00:00:47   1            66.6
2019-01-01 00:00:47   2            0.66
2019-01-01 00:00:57   1            66.7
2019-01-01 00:00:57   2            0.68
2019-01-01 00:00:57   3            166.6
2019-01-01 00:01:07   3            146.6

请注意,它只存储传感器读数的变化,精度和采样率有限,如果没有变化,则在最后一次变化后每小时重复一个值。

他们的查询意味着当传感器 Z 值通过此条件时检查传感器 A(和 B、C 和 D...)的值。他们使用 Python 和 Spark。

因此,为了比较不同传感器的值,我获取了这些传感器键的行并将结果转换为模式 [TimeStamp, ValueOfA, ..., Value of Z]。

df1 = df0.groupBy("TS").pivot("Id", listOfIds).agg(F.last("Value"))

TimeStamp             Sensor1      Sensor2     Sensor3
2019-01-01 00:00:47   66.6         0.66        Null
2019-01-01 00:00:57   66.7         0.68        166.6
2019-01-01 00:01:07   Null         Null        146.6

然后我填补空白(总是向前,如果我没有旧数据来填补第一行,我会丢弃它们)。

window1hour = Window.orderBy('TS').rowsBetween(-360, 0)
# 360 = 1 hour / 0.1 Hz sampling rate.
df2 = df1
for sid in sensorIds:
    df2 = df2\
        .withColumn(sid, F.last(F.column(sid), ignorenulls=True).over(window1hour))\
        .filter(F.column(sid).isNotNull())

现在,逐列的比较是微不足道的。

但是与做同样的事情相比,pandas它的速度更慢,以至于感觉就像我做错了什么。至少对于小查询。

发生了什么?当它是一个大查询时会发生什么

关于大小:我每年有超过数千个不同的传感器和大约十亿条记录。因此,数据绝对适合一台服务器,但不适合 RAM。事实上,他们只会从一台服务器开始处理数据,可能需要一秒钟来处理第二个 Spark 实例(多处理器和大量内存),如果他们看到回报,他们希望他们会投资更多的硬件。他们将开始每天进行小查询,并且希望他们快速完成。但是以后他们会想要在几年内进行查询,并且一定不能爆炸。

想法/疑问:预处理是否在单个线程中完成?我应该自己建立并行化,还是让 Spark 处理它?我是否应该在跨越多天的查询中打破跨年查询(但那我为什么要使用 Spark)?我是否解决了 pandas 中的小查询和 Spark 中的大查询(我可以预先设置阈值)吗?

我还可以应用哪些其他改进?

标签: pythonpandasapache-sparkpyspark

解决方案


“小”数据在 Spark 以外的工具中速度更快的情况并不少见。Spark 的并行功能具有相当大的开销(当然,与旧的 map-reduce 范例相比,这些开销非常小)。

Spark 的亮点在于它能够通过添加服务器来线性扩展“大”数据。在这一点上,开销变得值得,因为它会自动在所有可用的执行器之间分解工作。

我相信让 spark 处理并行化是理想的,即使只是为了简单起见。是否在另一个框架中实现“小”查询完全取决于您是否要维护两个代码路径,以及您的客户是否对它们的速度感到满意。


推荐阅读