首页 > 解决方案 > PySpark:VectorAssembler 的逆向

问题描述

注意:这看起来很长,但几乎一半的长度是因为我正在显示输出。真的没有那么长。

我有一个正在执行 PCA 的数据集。为此,我首先将所需的特征组合成一个向量,方法是VectorAssembler将其传递给StandardScaler然后传递给PCA. 假设k我要提取的组件()的数量是5. 现在的输出PCA将有一个额外的列,这将是一个包含k=5元素的特征向量。我想将此输出特征向量转回列。基本上,我想做与它相反的事情VectorAssembler。这是我用于执行中心缩放和拟合 pca 的代码。您可以跳到此代码块的端到端并查看输出train_w_pca.show(4)

# generate some random data
import random
n_cols = 5
n_samples = 1000
train = [[random.randint(random.randint(1,10), random.randint(11,20)) for j in range(n_cols)] for i in range(n_samples)]
col_names = ['col_'+str(i) for i in range(1, 1+n_cols)]
train = spark.createDataFrame(train, col_names)
train.show(4)

+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|   13|   15|    6|   13|    2|
|   11|   10|   10|   18|   12|
|   13|   11|   15|   16|   12|
|    6|    2|    9|    1|   14|
+-----+-----+-----+-----+-----+

from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
ss_inpt_features = 'ss_inpt_features'
pca_inpt_features = 'pca_inpt_features'
pca_otpt_features = 'pca_otpt_features'

# create feature vector
assembler = VectorAssembler(inputCols=col_names, outputCol=ss_inpt_features)
train_fv = assembler.transform(train)

# scale features
scaler = StandardScaler(inputCol=ss_inpt_features, outputCol=pca_inpt_features, withStd=True, withMean=True)
scaler_model = scaler.fit(train_fv)
train_w_scaling = scaler_model.transform(train_fv)

# fit pca
pca = PCA(k=5, inputCol=pca_inpt_features, outputCol=pca_otpt_features)
pca_model = pca.fit(train_w_scaling)
train_w_pca = pca_model.transform(train_w_scaling)

# remove columns created by vector assembler and standard scaler
drop_cols = [ss_inpt_features, pca_inpt_features]
train_w_pca = train_w_pca.drop(*drop_cols)
train_w_pca.show(4)

+-----+-----+-----+-----+-----+--------------------+
|col_1|col_2|col_3|col_4|col_5|   pca_otpt_features|
+-----+-----+-----+-----+-----+--------------------+
|   13|   15|    6|   13|    2|[-1.4493719150189...|
|   11|   10|   10|   18|   12|[-1.1865568406658...|
|   13|   11|   15|   16|   12|[-0.7908207273087...|
|    6|    2|    9|    1|   14|[3.09817312370792...|
+-----+-----+-----+-----+-----+--------------------+

以下代码块是我寻求建议的地方。我希望实现的最终结果是拥有我的原始输入功能以及k=5来自 PCA 的其他列。我需要 PCA 输出作为单独的列而不是向量。要将其转换pca_otpt_features为列,我正在执行以下操作

pca_df = train_w_pca.select(pca_otpt_features).rdd.map(lambda x: x[0].toArray().tolist()).toDF()

+-------------------+-------------------+-------------------+-------------------+-------------------+
|                 _1|                 _2|                 _3|                 _4|                 _5|
+-------------------+-------------------+-------------------+-------------------+-------------------+
|-1.4493719150189412|-2.0931036194154142|0.13464273679638206|-0.8503916932872954| -0.827046144216132|
|-1.1865568406658527| 0.9611918240919298| 0.4488590799950029| 0.2034917117540555|-1.1518683939179468|
|-0.7908207273087919|   1.41447404343126|  0.472864927075101|-0.9315863936330135|-0.3779343594619816|
|  3.098173123707924|-0.1132068349678993| -0.762727469457229| 1.6553163148968495| 0.4664493257688783|
+-------------------+-------------------+-------------------+-------------------+-------------------+

# drop the pca columns
train_w_pca = train_w_pca.drop(pca_otpt_features)
train_w_pca.show(4)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|   13|   15|    6|   13|    2|
|   11|   10|   10|   18|   12|
|   13|   11|   15|   16|   12|
|    6|    2|    9|    1|   14|
+-----+-----+-----+-----+-----+

# add index to both df to join
from pyspark.sql.types import LongType
def zipindexdf(pca_df):
    """
    :param pca_df: spark dataframe to which an index column is to be added
    :return: same dataframe but with an additional index column
    """
    schema_new = pca_df.schema.add("index", LongType(), False)
    return pca_df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

pca_df_index = zipindexdf(pca_df)
train_w_pca_index = zipindexdf(train_w_pca)

+-----+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|index|
+-----+-----+-----+-----+-----+-----+
|   13|   15|    6|   13|    2|    0|
|   11|   10|   10|   18|   12|    1|
|   13|   11|   15|   16|   12|    2|
|    6|    2|    9|    1|   14|    3|
+-----+-----+-----+-----+-----+-----+


# join both the df
df_new = train_w_pca_index.join(pca_df_index, "index", "inner")

# drop index column
df_new = df_new.drop('index')
df_new.show(4)

+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+
|col_1|col_2|col_3|col_4|col_5|                 _1|                  _2|                  _3|                 _4|                  _5|
+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+
|    3|   19|   14|   17|   14|-1.9629750431606783|  0.9103384408550863|   2.439315065646054| 0.7714728122568422|  1.3015345479237306|
|   11|    5|   12|   10|   13| 1.1028687243351978|  0.9414552974121673| -0.3333432052612606|0.17650933263052482| -0.4960485125995481|
|   11|   14|    9|   10|   13|-0.7282369753377401|-0.03908539244478759|-0.38809447002972186|0.34112713979078885|   0.737794945616174|
|    7|    8|    8|   12|   15| 0.1241090961643788|  0.6517788602347092|  0.0936343345345299| 1.5732407177172711|-0.28060308166314357|
+-----+-----+-----+-----+-----+-------------------+--------------------+--------------------+-------------------+--------------------+

有一个更好的方法吗?我正在做的很多事情,比如添加索引、执行连接等似乎是不必要的。有没有一种更清洁、更不痛苦的方法来获得结果?

标签: pythonapache-sparkpyspark

解决方案


推荐阅读