python - 从聚合数据的主要 DCT 系数创建特征的最有效方法 - PySpark
问题描述
我目前正在为一个数据集创建特征,该数据集包含一些设备的各种传感器读数的时间序列数据,这些数据可能与相同的故障事件有关。该数据的基本结构是我们有一个表,它结合了设备 ID、时间戳和传感器读数。
| ID | Cycle_ID | Timestamp | sensor_1 | sensor_2 |
|----|----------|------------|----------|----------|
| 1 | 1 | 1547142555 | 123 | 641 |
| 1 | 1 | 1547142556 | 123 | 644 |
| 1 | 2 | 1547142557 | 124 | 643 |
现在的想法是根据循环聚合数据以创建与这些循环相对应的序列(和相应的特征)。原始数据量很大,需要使用 spark,但是聚合后的结果数据集足够小,可以将其存储在 Pandas DF 中并使用 keras 构建模型。除其他外,一个想法是为某些传感器收集领先的 DCT 组件,以便将它们用作一项功能。为了做到这一点,我们(除其他外)进行以下聚合:
from pyspark.sql import Row, window
import pyspark.sql.functions as func
W = window.Window.partitionBy('ID', 'Cycle_ID').orderBy('Timestamp')
df_collect = pfr_flight_match.withColumn('sensor_1_coll',
func.collect_list('sensor_1').over(W)) \
.groupBy('ID', 'Cycle_ID') \
.agg(func.max("sensor_1_coll").alias('sensor_1_coll'))
这给了我,对于每个设备的每个周期,传感器时间序列分别作为一个数组。现在的想法是对其执行 DCT,只保留前导n
系数,并将它们分别添加为新的特征列。我想出了一种方法来做到这一点,但是,性能似乎很糟糕,这就是我寻求帮助的原因。
由于不幸的是无法在数组上使用 Pyspark 的 DCT(根据文档,该功能必须是 DenseVector 类型),我们需要将收集的数组转换为 DenseVector。在我看来,没有有效的方法,所以我使用 UDF 来做到这一点:
import pyspark.ml
to_vec = func.udf(lambda x: pyspark.ml.linalg.DenseVector(x),
pyspark.ml.linalg.VectorUDT())
下一步是执行 DCT 本身,使用如下所示:
# Determine which column is the target of DCT
col_to_transform = 'sensor_1_coll'
df = df_collect.withColumn('vec', to_vec(col_to_transform))
# After switching the column type to DenseVector, we can apply DCT
dct = pyspark.ml.feature.DCT(inverse=False, inputCol='vec', outputCol='vec_dct')
df_dct = dct.transform(df)
# Drop intermediate columns
df_dct = df_dct.drop('vec', col_to_transform)
现在到了我担心陷阱的地方:我们需要将 DCT 向量截断为一定数量的系数,然后将这些系数分解为单独的列,以便稍后将它们传递到 Pandas DF/Numpy 数组中。
我担心使用 UDF 在性能方面并不好;无论如何, DenseVector 不表示为数组类型。所以这在这里不起作用:
import pyspark.ml
trunc_vec = func.udf(lambda x: x[0:n],
pyspark.ml.linalg.VectorUDT())
所以我最后做的是将一个合适的函数映射到上述 DF 的 RDD 版本,并将其作为数据帧返回。这就是我现在正在使用的:
# State columns used for grouping
idx = ['ID', 'Cycle_ID']
keep_coeffs = 30 # How many of the leading coefficients shall be kept?
from functools import partial
# To be mapped onto rdd: Return auxillary columns plus the DCT coeffs as
# individual columns, which are named serially
def truncate_dct_vec(vec, coeffs):
return tuple(vec[i] for i in idx) + tuple(vec.vec_dct.toArray()[0:coeffs+1].tolist())
truncate_dct_vec = partial(truncate_dct_vec, coeffs=keep_coeffs)
# Perform the mapping to get the truncated DCT coefficients, each in an individual column
df_dct = df_dct.rdd.map(truncate_dct_vec).toDF(idx)
问题是这似乎运行起来非常慢(可能是由于 JVM 和 python 之间的序列化和转换在执行所有这些步骤时发生),这几乎是令人望而却步的。我主要是在寻找更快的替代品。对此的任何帮助表示赞赏。
解决方案
这是一个旧线程,但是,我希望这对将来的其他人有所帮助。VectorAssembler 会将一列或多列编码为密集向量表示。如果您需要稀疏表示,请查看 FeatureHasher。它也支持分类和布尔值。
无论如何,这应该可以解决问题:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.ml.feature import VectorAssembler, DCT
rows = [Row(id=1, cycle_id=1, sensor_1=123, sensor_2=641),
Row(id=1, cycle_id=1, sensor_1=123, sensor_2=644),
Row(id=1, cycle_id=2, sensor_1=124, sensor_2=643)]
data_schema = StructType([StructField("id", IntegerType(), True),
StructField("cycle_id", IntegerType(), True),
StructField("sensor_1", IntegerType(), True),
StructField("sensor_2", IntegerType(), True)])
df = spark.createDataFrame(rows, data_schema)
cols = ["id", "cycle_id", "sensor_1", "sensor_2"]
assembler = VectorAssembler(inputCols=cols, outputCol="features")
df = assembler.transform(df)
dct = DCT(inverse=False, inputCol="features", outputCol="features_dct")
dct_df = dct.transform(df)
dct_df.select("features_dct").show(truncate=False)
以下将 DCT 反转为原始信号:
dct_inv = DCT(inverse=True, inputCol="features_dct", outputCol="features_dct_inverse")
dct_df_inv = dct_inv.transform(dct_df)
dct_df_inv.select("features_dct_inverse").show(truncate=False)
推荐阅读
- django - 序列化drf中多个表的数据
- java - 设置 OCL 上下文
- sql-server - AppMaker 可以与 SQL Server 一起使用吗
- python - How to Import Multiple excel file in PandasDataframe
- android - Xamarin Android Nunit 测试可在测试资源管理器中发现,但在运行测试时未发现。(VS2017)
- sql-server - 链接维度性能问题
- node.js - 如何重命名上传到 Cloudinary 的 64base 图像的名称
- c# - C# 和 C/C++ 中二进制到浮点数转换的区别
- android - 由于错误 cat proc cpuinfo,无法在远程设备中运行 ionic 应用程序
- mongodb - 通过条件查找优化 Mongo 聚合查询