python - PySpark:VectorAssembler 的逆向
问题描述
注意:这看起来很长,但几乎一半的长度是因为我正在显示输出。真的没有那么长。
我有一个正在执行 PCA 的数据集。为此,我首先将所需的特征组合成一个向量,方法是VectorAssembler
将其传递给StandardScaler
然后传递给PCA
. 假设k
我要提取的组件()的数量是5
. 现在的输出PCA
将有一个额外的列,这将是一个包含k=5
元素的特征向量。我想将此输出特征向量转回列。基本上,我想做与它相反的事情VectorAssembler
。这是我用于执行中心缩放和拟合 pca 的代码。您可以跳到此代码块的端到端并查看输出train_w_pca.show(4)
# generate some random data
import random
n_cols = 5
n_samples = 1000
train = [[random.randint(random.randint(1,10), random.randint(11,20)) for j in range(n_cols)] for i in range(n_samples)]
col_names = ['col_'+str(i) for i in range(1, 1+n_cols)]
train = spark.createDataFrame(train, col_names)
train.show(4)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
| 13| 15| 6| 13| 2|
| 11| 10| 10| 18| 12|
| 13| 11| 15| 16| 12|
| 6| 2| 9| 1| 14|
+-----+-----+-----+-----+-----+
from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
ss_inpt_features = 'ss_inpt_features'
pca_inpt_features = 'pca_inpt_features'
pca_otpt_features = 'pca_otpt_features'
# create feature vector
assembler = VectorAssembler(inputCols=col_names, outputCol=ss_inpt_features)
train_fv = assembler.transform(train)
# scale features
scaler = StandardScaler(inputCol=ss_inpt_features, outputCol=pca_inpt_features, withStd=True, withMean=True)
scaler_model = scaler.fit(train_fv)
train_w_scaling = scaler_model.transform(train_fv)
# fit pca
pca = PCA(k=5, inputCol=pca_inpt_features, outputCol=pca_otpt_features)
pca_model = pca.fit(train_w_scaling)
train_w_pca = pca_model.transform(train_w_scaling)
# remove columns created by vector assembler and standard scaler
drop_cols = [ss_inpt_features, pca_inpt_features]
train_w_pca = train_w_pca.drop(*drop_cols)
train_w_pca.show(4)
+-----+-----+-----+-----+-----+--------------------+
|col_1|col_2|col_3|col_4|col_5| pca_otpt_features|
+-----+-----+-----+-----+-----+--------------------+
| 13| 15| 6| 13| 2|[-1.4493719150189...|
| 11| 10| 10| 18| 12|[-1.1865568406658...|
| 13| 11| 15| 16| 12|[-0.7908207273087...|
| 6| 2| 9| 1| 14|[3.09817312370792...|
+-----+-----+-----+-----+-----+--------------------+
以下代码块是我寻求建议的地方。我希望实现的最终结果是拥有我的原始输入功能以及k=5
来自 PCA 的其他列。我需要 PCA 输出作为单独的列而不是向量。要将其转换pca_otpt_features
为列,我正在执行以下操作
pca_df = train_w_pca.select(pca_otpt_features).rdd.map(lambda x: x[0].toArray().tolist()).toDF()
+-------------------+-------------------+-------------------+-------------------+-------------------+
| _1| _2| _3| _4| _5|
+-------------------+-------------------+-------------------+-------------------+-------------------+
|-1.4493719150189412|-2.0931036194154142|0.13464273679638206|-0.8503916932872954| -0.827046144216132|
|-1.1865568406658527| 0.9611918240919298| 0.4488590799950029| 0.2034917117540555|-1.1518683939179468|
|-0.7908207273087919| 1.41447404343126| 0.472864927075101|-0.9315863936330135|-0.3779343594619816|
| 3.098173123707924|-0.1132068349678993| -0.762727469457229| 1.6553163148968495| 0.4664493257688783|
+-------------------+-------------------+-------------------+-------------------+-------------------+
# drop the pca columns
train_w_pca = train_w_pca.drop(pca_otpt_features)
train_w_pca.show(4)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
| 13| 15| 6| 13| 2|
| 11| 10| 10| 18| 12|
| 13| 11| 15| 16| 12|
| 6| 2| 9| 1| 14|
+-----+-----+-----+-----+-----+
# add index to both df to join
from pyspark.sql.types import LongType
def zipindexdf(pca_df):
"""
:param pca_df: spark dataframe to which an index column is to be added
:return: same dataframe but with an additional index column
"""
schema_new = pca_df.schema.add("index", LongType(), False)
return pca_df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
pca_df_index = zipindexdf(pca_df)
train_w_pca_index = zipindexdf(train_w_pca)
+-----+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|index|
+-----+-----+-----+-----+-----+-----+
| 13| 15| 6| 13| 2| 0|
| 11| 10| 10| 18| 12| 1|
| 13| 11| 15| 16| 12| 2|
| 6| 2| 9| 1| 14| 3|
+-----+-----+-----+-----+-----+-----+
# join both the df
df_new = train_w_pca_index.join(pca_df_index, "index", "inner")
# drop index column
df_new = df_new.drop('index')
df_new.show(4)
+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+
|col_1|col_2|col_3|col_4|col_5| _1| _2| _3| _4| _5|
+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+
| 3| 19| 14| 17| 14|-1.9629750431606783| 0.9103384408550863| 2.439315065646054| 0.7714728122568422| 1.3015345479237306|
| 11| 5| 12| 10| 13| 1.1028687243351978| 0.9414552974121673| -0.3333432052612606|0.17650933263052482| -0.4960485125995481|
| 11| 14| 9| 10| 13|-0.7282369753377401|-0.03908539244478759|-0.38809447002972186|0.34112713979078885| 0.737794945616174|
| 7| 8| 8| 12| 15| 0.1241090961643788| 0.6517788602347092| 0.0936343345345299| 1.5732407177172711|-0.28060308166314357|
+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+
有一个更好的方法吗?我正在做的很多事情,比如添加索引、执行连接等似乎是不必要的。有没有一种更清洁、更不痛苦的方法来获得结果?
解决方案
推荐阅读
- node.js - Aws ec2 实例在使用 docker 和 gitlab ci cd 时不断崩溃
- database - KDB/Q:无法在 KDB 表中创建存储列表的列
- jquery - owlcarousel 权限被拒绝访问属性“申请”
- c# - 如何仅在应用程序启动时调用中间件,而不是.net core api中的每个请求?
- python - 使用 Keras 维度误差的 CNN-1D(Seq2point) 时间序列预测
- flutter - “类型'double'不是类型转换中类型'int'的子类型”颤动中的错误。我应该怎么办?
- reactjs - TypeError:this.state.blog.map 不是函数
- postgresql - PostgreSQL 中的动态会话变量
- google-chrome - chrome://webrtc-internals/ 如何知道一个新选项卡已打开,它可以从中收集正在进行的 webrtc 调用的统计信息
- java - 导入 selenium-server-4.0.0-alpha-6.jar 但不能使用 API(setBinary,setExperimentalOption)和 EdgeOptions