python - 优化旋转和填充
问题描述
他们给了我一个存储传感器读数的表格,其中包含一个模式[TimeStamp, SensorKey, SensorValue]
。
TimeStamp Id Value
2019-01-01 00:00:47 1 66.6
2019-01-01 00:00:47 2 0.66
2019-01-01 00:00:57 1 66.7
2019-01-01 00:00:57 2 0.68
2019-01-01 00:00:57 3 166.6
2019-01-01 00:01:07 3 146.6
请注意,它只存储传感器读数的变化,精度和采样率有限,如果没有变化,则在最后一次变化后每小时重复一个值。
他们的查询意味着当传感器 Z 值通过此条件时检查传感器 A(和 B、C 和 D...)的值。他们想使用 Python 和 Spark。
因此,为了比较不同传感器的值,我获取了这些传感器键的行并将结果转换为模式 [TimeStamp, ValueOfA, ..., Value of Z]。
df1 = df0.groupBy("TS").pivot("Id", listOfIds).agg(F.last("Value"))
TimeStamp Sensor1 Sensor2 Sensor3
2019-01-01 00:00:47 66.6 0.66 Null
2019-01-01 00:00:57 66.7 0.68 166.6
2019-01-01 00:01:07 Null Null 146.6
然后我填补空白(总是向前,如果我没有旧数据来填补第一行,我会丢弃它们)。
window1hour = Window.orderBy('TS').rowsBetween(-360, 0)
# 360 = 1 hour / 0.1 Hz sampling rate.
df2 = df1
for sid in sensorIds:
df2 = df2\
.withColumn(sid, F.last(F.column(sid), ignorenulls=True).over(window1hour))\
.filter(F.column(sid).isNotNull())
现在,逐列的比较是微不足道的。
但是与做同样的事情相比,pandas
它的速度更慢,以至于感觉就像我做错了什么。至少对于小查询。
发生了什么?当它是一个大查询时会发生什么?
关于大小:我每年有超过数千个不同的传感器和大约十亿条记录。因此,数据绝对适合一台服务器,但不适合 RAM。事实上,他们只会从一台服务器开始处理数据,可能需要一秒钟来处理第二个 Spark 实例(多处理器和大量内存),如果他们看到回报,他们希望他们会投资更多的硬件。他们将开始每天进行小查询,并且希望他们快速完成。但是以后他们会想要在几年内进行查询,并且一定不能爆炸。
想法/疑问:预处理是否在单个线程中完成?我应该自己建立并行化,还是让 Spark 处理它?我是否应该在跨越多天的查询中打破跨年查询(但那我为什么要使用 Spark)?我是否解决了 pandas 中的小查询和 Spark 中的大查询(我可以预先设置阈值)吗?
我还可以应用哪些其他改进?
解决方案
“小”数据在 Spark 以外的工具中速度更快的情况并不少见。Spark 的并行功能具有相当大的开销(当然,与旧的 map-reduce 范例相比,这些开销非常小)。
Spark 的亮点在于它能够通过添加服务器来线性扩展“大”数据。在这一点上,开销变得值得,因为它会自动在所有可用的执行器之间分解工作。
我相信让 spark 处理并行化是理想的,即使只是为了简单起见。是否在另一个框架中实现“小”查询完全取决于您是否要维护两个代码路径,以及您的客户是否对它们的速度感到满意。
推荐阅读
- ios - 在现有键值中添加新数组对象而不更改 - Objective C
- openmdao - 我无法记录/访问衍生品
- amazon-web-services - 使用 Aurora Serverless 作为副本
- angular - MVC 5 问题中的 Angular 7 安装
- python - 使用 pywinauto 自动化点击开始按钮
- android - 杂项通知未从设置中启用
- node.js - push 或 addToSet 不适用于 express、node 和 mongoose(得到响应 { ok: 0, n: 0, nModified: 0 })
- css - Safari div 高度与 line-height 不匹配
- angular7 - 无法从后端获取数据得到 404
- php - 如何在laravel中获取用户使用总平均时间