pyspark - 如何在 Pyspark 中实现对速度的有效时间长度转换?
问题描述
一台机器提供数千个传感器的数据。机器在第一时间展开金属条。下一次加热金属带,第三次冷却金属带。通过时间戳、测得的速度和触发器(例如输入/输出烤箱),将在 ETL 步骤中生成带变量。
+----------------+----------+-----------+---------+-----+
|time |input_oven|output_oven|temp_oven|speed|
+----------------+----------+-----------+---------+-----+
|2017-01-01-01-20|0 |0 |450 |3 |
|2017-01-01-01-21|0 |0 |450 |3 |
|2017-01-01-01-22|1 |0 |450 |3 |
|2017-01-01-01-23|0 |0 |450 |4 |
|2017-01-01-01-24|0 |0 |451 |4 |
|2017-01-01-01-25|0 |1 |450 |4 |
|2017-01-01-01-26|0 |0 |450 |3 |
+----------------+----------+-----------+---------+-----+
如您所见,速度可能会有所不同。我试过下面的代码,但这太不准确了,比如机器可以停止。
from scipy import integrate
s = lambda s: col_speed*col_time
integrate.quad(s, time_1, time_2)
因此,必须通过速度变量进行积分,以便生成新的仪表变量。一个文件包含 5000 个传感器的 30k 个条目。
结果需要是一个与所有传感器数据平行的表格,以便我可以看到:金属条纹计已经经历了炉温和冷却速度。
非常欢迎任何帮助,我提前感谢您。
编辑
为了提供进一步的见解,我添加了以下图片。
一条生产线的多个传感器信号的时间序列。绿线代表当前时间。黄线代表不同时间戳的相同长度位置。
ETL 作业的目标应是所有传感器信号相对于长度位置的对齐。因此,我有了使用以下等式的想法:
length = speed * time
time = time_delta(output_oven-input_oven)
speed = avg(speed)
对于给定的示例数据,对于完整的 DataFrame,应该像这样求解方程
length = avg(speed) * time_delta(output_oven-input_oven)
length = 4 m/min * 2017-01-01-01-25-2017-01-01-01-22
length = 4 m/min * 3 min = 12 m
现在我知道我的金属条的哪一部分穿过了烤箱。假设我的金属乐队有 12 米长。我现在想根据长度滞后所有其他传感器信号。
解决方案
这是我的尝试,这接近你想要的吗?
from pyspark.sql import functions as f
from pyspark.sql import Row
Columns = Row('time','input_oven','output_oven','temp_oven','speed')
x=[Columns(20,0,0 ,450,3),
Columns(21,0,0 ,450,3),
Columns(22,1,0 ,450,3),
Columns(23,0,0 ,450,4),
Columns(24,0,0 ,451,4),
Columns(25,0,1 ,450,4),
Columns(26,0,0 ,450,3)]
df = spark.createDataFrame(x).withColumn('id', f.lit(1))
df.printSchema()
df1 = df.withColumn('oven', df['input_oven']+df['output_oven'])
from pyspark.sql.window import Window
w = Window.partitionBy(df['id']).orderBy(df['time'])
cum_oven = f.sum(df1['oven']).over(w)
df2 = df1.select(df1['time'],df1['speed'], df1['output_oven'],cum_oven.alias('cum_oven'))
df3 = df2.withColumn('cum_oven', df2['cum_oven']-df2['output_oven']).drop(df2['output_oven'])
ws = Window.partitionBy(df3['cum_oven']).orderBy(df3['time'])
metal_length = (f.max(df3['time']).over(ws)-f.min(df3['time']).over(ws))*df3['speed']
df4 = df3.select(df3['time'], df3['cum_oven'], metal_length.alias('metal_length'))
fdf = df.join(df4, ['time'])
fdf.drop('id').sort('time').show()
+----+----------+-----------+---------+-----+--------+------------+
|time|input_oven|output_oven|temp_oven|speed|cum_oven|metal_length|
+----+----------+-----------+---------+-----+--------+------------+
| 20| 0| 0| 450| 3| 0| 0|
| 21| 0| 0| 450| 3| 0| 3|
| 22| 1| 0| 450| 3| 1| 0|
| 23| 0| 0| 450| 4| 1| 4|
| 24| 0| 0| 451| 4| 1| 8|
| 25| 0| 1| 450| 4| 1| 12|
| 26| 0| 0| 450| 3| 2| 0|
+----+----------+-----------+---------+-----+--------+------------+
最终积分只是 groupBy、max 和 sum?
推荐阅读
- telephony - Patch() 到 Kazoo 后帐户被禁用
- blockchain - 错误:处理事务时出现 VM 异常:气体不足,设置映射键值错误
- flutter - 如何更改 Flutter 中提升按钮的选定文本的颜色?
- python - 如何从 Pandas 中的一行中删除数据?
- typescript - 类型区分在函数模板参数中不起作用
- java - 如何在 Java 中对 HashMap 的 ArrayList 使用包含方法?
- tcp - Unity TCP 请求到达服务器客户端的数量与执行的客户端数量一样多
- python - TypeError:DriverAction() 得到了一个意外的关键字参数“nargs”
- fonts - 从代码发送电子邮件:在 Gmail 的电子邮件中使用 Google 字体
- postgresql - 无法使用 CMD 或入口点覆盖命令