java - 在计算 PCA 附带的附加值时更好地利用 Apache Spark ML
问题描述
我想编写一个Apache Spark
代码,通过添加有关每个组件的全局汇总值、更高特征值、个人对轴的贡献、个人在轴上的投影质量的信息来完成 PCA 的计算。
有用。并从 50 个特征的向量中计算它们(此处为三个第一个体):
+-------------+-------------------------------------------+------------------------------------------+--------------------+---------------------------------------------+------------------------------------------+
|features |scaledFeatures |pca |xStdCarre |contributionIndividuelleAxe |qualiteProjectionIndividuAxe |
+-------------+-------------------------------------------+------------------------------------------+--------------------+---------------------------------------------+------------------------------------------+
|[100.0,100.0]|[3.665634766586407,3.156450874585141] |[-4.823943018707778,-0.36004738290505234] |23.400060365676286 |[0.29741427723298436,0.006561249644731516] |[0.9944600947215109,0.00553990527848926] |
|[86.1,70.3] |[2.711113911293206,0.8064410737434008] |[-2.4872869831159297,-1.3468070793732898] |8.000485845427955 |[0.07906955024417166,0.09180747147437675] |[0.773277605373604,0.22672239462639585] |
|[67.9,69.2] |[1.461309625945275,0.7194036737122256] |[-1.5419971620115105,-0.5246067298266516] |2.6529674686309654 |[0.03038957477136533,0.013929481805149522] |[0.8962624969082515,0.10373750309174866] |
备注:在流程结束时,“功能”列不再存在。一旦功能标准化,它就会被删除。但我在此示例中添加了此列以使其更具可读性。
我使用一些私有方法进行此计算:
- 一个创建初始特征向量。
- 然后,一个简短的代码对其进行标准化,并要求 Spark 从中计算 PCA。
- 另一种私有方法计算每个分量的较高特征值、主分量与其汇总的变量的相关性,以及主分量为特征提供的全局质量。
- 最后一个计算每个人在计算每个主成分时的个人贡献,以及每个人在每个轴上的投影精度。
因为我从 ML(来自spark-mllib
)开始,我想我错过了它的一些重要功能,这些功能可以让我的代码更清晰、更简单。你能告诉我我应该做些什么来创建一个更好地利用Apache Spark
ML 可以做的代码吗?
package test;
import static java.lang.Math.sqrt;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.types.DataTypes.*;
import java.io.*;
import java.util.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.ml.feature.*;
import org.apache.spark.ml.linalg.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.sql.types.*;
import org.junit.jupiter.api.*;
import org.springframework.boot.test.context.*;
import org.springframework.test.context.*;
/**
* Analyse en composantes principales.
*/
@ActiveProfiles("test")
@SpringBootTest(classes = SparkTestApplicationGestion.class)
public class AnalyseComposantesPrincipalesIT extends AbstractStatsTest implements Serializable {
/** Serial ID. */
private static final long serialVersionUID = 7301395393891047373L;
/**
* Recherche de composantes principales.
*/
@Test
@DisplayName("ACP : le classement des universités américaines")
public void composantePrincipale() {
// Créer le dataset des features (des vecteurs) :
// features
// [100.0,100.0]
// [86.1,70.3]
// [67.9,69.2]
// ...
Dataset<Row> dsHiCiSCi = denseRowWithVectors(
// Le classement (variables HiCi et SCi des universités américaines).
d(100, 100), d(86.1, 70.3), d(67.9, 69.2), d(54.0, 65.4), d(65.9, 61.7),
d(58.4, 50.3), d(56.5, 69.6), d(59.3, 46.5), d(50.8, 54.1), d(46.3, 65.4),
d(57.9, 63.2), d(54.5, 65.1), d(57.4, 75.9), d(59.3, 64.6), d(56.9, 70.8),
d(52.4, 74.1), d(52.9, 67.2), d(54.0, 59.8), d(41.3, 67.9), d(41.9, 80.9),
d(60.7, 77.1), d(38.5, 68.6), d(40.6, 62.2), d(39.2, 77.6), d(38.5, 63.2),
d(44.5, 57.6), d(35.5, 38.4), d(39.2, 53.4), d(46.9, 57.0), d(41.3, 53.9),
d(27.7, 23.2), d(46.9, 62.0), d(48.6, 67.0), d(39.9, 45.7), d(42.6, 42.7),
d(31.4, 63.1), d(40.6, 53.3), d(46.9, 54.8), d(23.4, 54.2), d(30.6, 38.0),
d(31.4, 51.0), d(27.7, 56.6), d(45.1, 58.0), d(46.9, 64.2), d(35.5, 48.9),
d(25.7, 51.7), d(39.9, 44.8), d(24.6, 56.9), d(39.9, 65.6), d(37.1, 52.7));
long n = dsHiCiSCi.count();
int p = 2; // Deux variables.
int nombreComposantes = 2;
// Afficher les valeurs des universités.
dsHiCiSCi.show((int)n, false);
// Centrer réduire les valeurs (Standardizing).
// scaledFeatures
// [3.665634766586407,3.156450874585141]
// [2.711113911293206,0.8064410737434008]
// [1.461309625945275,0.7194036737122256]
// ...
StandardScaler scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
.setWithStd(true) // Réduire
.setWithMean(true); // Centrer
Dataset<Row> centreReduit = scaler.fit(dsHiCiSCi).transform(dsHiCiSCi).select("scaledFeatures");
// Faire calculer par ML les composantes principales (nombreComposantes extraites de p variables).
//
// scaledFeatures pca
// [3.665634766586407,3.156450874585141] [-4.823943018707778,-0.36004738290505234]
// [2.711113911293206,0.8064410737434008] [-2.4872869831159297,-1.3468070793732898]
// [1.461309625945275,0.7194036737122256] [-1.5419971620115105,-0.5246067298266516]
// ...
PCA pcaOperationDefinition = new PCA().setInputCol("scaledFeatures").setOutputCol("pca").setK(nombreComposantes);
Dataset<Row> composantesPrincipalesCalculees = pcaOperationDefinition.fit(centreReduit).transform(centreReduit);
composantesPrincipalesCalculees.show((int)n, false);
// Calculer la plus grande valeur propre de chaque composante, sa corrélation avec la variable qu'elle résume, et la qualité globale du résumé qu'elle en fait.
//
// valeurPropre correlationComposanteAvecXi qualiteResumeComposante
// 1.5648493047635532 0.8845477106305665 0.7824246523817766
// 0.3951506952364467 0.44449448547560555 0.19757534761822335
Dataset<Row> valeursPropresEtQualite = valeurPropreEtQualiteGlobaleResume(pcaOperationDefinition, composantesPrincipalesCalculees.select(col("pca")), n, p);
valeursPropresEtQualite.show(false);
// Calculer les contributions individuelles et la qualité de la représentation de chaque individu sur chaque axe.
//
// xStdCarre contributionIndividuelleAxe qualiteProjectionIndividuAxe
// 23.400060365676286 [0.29741427723298436,0.006561249644731516] [0.9944600947215109,0.00553990527848926]
// 8.000485845427955 [0.07906955024417166,0.09180747147437675] [0.773277605373604,0.22672239462639585]
// 2.6529674686309654 [0.03038957477136533,0.013929481805149522] [0.8962624969082515,0.10373750309174866]
// ...
Dataset<Row> contributionsEtQualiteProjectionIndividuelles = contributionsEtQuailiteIndividuelle(composantesPrincipalesCalculees, valeursPropresEtQualite, n);
contributionsEtQualiteProjectionIndividuelles.show((int)n, false);
}
/**
* Renvoyer les valeurs propres (eigen values), la corrélation des composantes aux variables Xi, et la qualité globale du résumé qu'elle en donne.
* @param pcaOperationDefinition Définition de l'opération d'ACP demandée.
* @param pca Résultat de l'ACP.
* @param n Nombre d'éléments dans la matrice de départ.
* @param p Nombre de variables.
* @return Dataset contenant les valeurs propres, la corrélation des composantes aux variables Xi, et la qualité globale du résumé de chaque composante.
*/
private Dataset<Row> valeurPropreEtQualiteGlobaleResume(PCA pcaOperationDefinition, Dataset<Row> pca, long n, int p) {
int nombreComposantes = pcaOperationDefinition.getK();
double[] valeursPropres = new double[nombreComposantes]; // Plus grande valeur propre de R pour une composante.
double[] correlationsComposantesAvecXi = new double[nombreComposantes]; // Corrélation de la composante avec Xi.
double[] qualiteComposantes = new double[nombreComposantes]; // Qualité globale du résumé proposé par la composante.
List<Row> enregistrements = new ArrayList<>();
List<Row> composantesTousIndividusUniversites = pca.select(col("pca")).collectAsList();
for(Row composantesIndividuUniversite : composantesTousIndividusUniversites) {
for(int numeroComposante = 1; numeroComposante <= nombreComposantes; numeroComposante ++) {
double composante = ((DenseVector)composantesIndividuUniversite.get(0)).values()[numeroComposante-1]; // Φ[numero_composante]
valeursPropres[numeroComposante-1] += composante * composante;
}
}
for(int numeroComposante = 1; numeroComposante <= nombreComposantes; numeroComposante ++) {
valeursPropres[numeroComposante-1] = valeursPropres[numeroComposante-1] / n;
// Calcul de la qualité du résumé de la composante :
correlationsComposantesAvecXi[numeroComposante-1] = sqrt(valeursPropres[numeroComposante-1]) * (1 / sqrt(p));
qualiteComposantes[numeroComposante-1] = valeursPropres[numeroComposante-1] / p;
Row row = RowFactory.create(valeursPropres[numeroComposante-1], correlationsComposantesAvecXi[numeroComposante-1], qualiteComposantes[numeroComposante-1]);
enregistrements.add(row);
}
// Construire le dataset avec le schéma [valeurPropre (double), qualiteResumeComposante (double)].
StructType schema = new StructType(new StructField[] {
new StructField("valeurPropre", DoubleType, false, Metadata.empty()),
new StructField("correlationComposanteAvecXi", DoubleType, false, Metadata.empty()),
new StructField("qualiteResumeComposante", DoubleType, false, Metadata.empty()),
});
return this.session.createDataFrame(enregistrements, schema);
}
/**
* Calculer les contributions individuelles par axe et la qualité de la projection pour l'individu.
* @param acp Individus centrés réduits et ACP.
* @param valeursPropresEtQualitesComposantes Dataset des valeurs propres et qualité des composantes, par variable.
* @param n Nombre d'individus.
* @return Dataset d'ACP complété de la contribution individuelle et de la qualité des composantes.
*/
private Dataset<Row> contributionsEtQuailiteIndividuelle(Dataset<Row> acp, Dataset<Row> valeursPropresEtQualitesComposantes, long n) {
this.session.udf().register("xCarresCentresReduitsUDF", new UDF1<DenseVector, Double>() {
/** Serial ID. */
private static final long serialVersionUID = 1L;
/**
* Renvoyer la somme des x carrés centrés réduits des variables d'un individu.
* @param individuCentreReduit Individu aux variables centrées réduites.
*/
@Override
public Double call(DenseVector individuCentreReduit) {
double sommeXCarreCentresReduits = 0.0;
double[] variablesIndividuCentreesReduites = individuCentreReduit.values();
for(int index=0; index < individuCentreReduit.size(); index ++) {
sommeXCarreCentresReduits += variablesIndividuCentreesReduites[index] * variablesIndividuCentreesReduites[index];
}
return sommeXCarreCentresReduits;
}
}, DoubleType);
// Faire la somme, par individu, des carrés des valeurs centrées réduites x* de chaque variable se rapportant à un individu.
Dataset<Row> ds = acp.withColumn("xStdCarre", callUDF("xCarresCentresReduitsUDF", col("scaledFeatures")));
// Calculer les contributions individuelles par axe.
List<Double> listeValeursPropres = valeursPropresEtQualitesComposantes.select("valeurPropre").map((MapFunction<Row, Double>)v -> v.getDouble(0), Encoders.DOUBLE()).collectAsList();
DenseVector valeursPropres = (DenseVector)Vectors.dense(listeValeursPropres.stream().mapToDouble(Double::doubleValue).toArray());
this.session.udf().register("contributionIndividuelleAxe", new UDF1<DenseVector, DenseVector>() {
/** Serial ID. */
private static final long serialVersionUID = 1L;
/**
* Renvoyer la contribution individuelle d'un individu sur un axe.
* @param composantes Composantes principales d'un individu.
*/
@Override
public DenseVector call(DenseVector composantes) {
double[] contributions = new double[composantes.size()];
for(int axe=1; axe <= contributions.length; axe++) {
double composante = composantes.values()[axe-1];
double valeurPropre = valeursPropres.values()[axe-1];
contributions[axe-1] = ((composante * composante) / n) / valeurPropre;
}
return (DenseVector)Vectors.dense(contributions);
}
}, new VectorUDT());
ds = ds.withColumn("contributionIndividuelleAxe", callUDF("contributionIndividuelleAxe", col("pca")));
// Calculer la qualité de la projection, par individu, sur les axes.
this.session.udf().register("qualiteProjectionIndividuAxe", new UDF2<DenseVector, Double, DenseVector>() {
/** Serial ID. */
private static final long serialVersionUID = 1L;
/**
* Renvoyer la qualité de la projection, par individu, sur les axes.
* @param composantes Individu aux variables centrées réduites.
* @param xStdCarre valeurs centrées réduites, au carré, cumulées pour toutes les variables associées à l'individu.
*/
@Override
public DenseVector call(DenseVector composantes, Double xStdCarre) {
double[] qualite = new double[composantes.size()];
for(int axe=1; axe <= qualite.length; axe++) {
double composante = composantes.values()[axe-1];
qualite[axe-1] = (composante * composante) / xStdCarre;
}
return (DenseVector)Vectors.dense(qualite);
}
}, new VectorUDT());
return ds.withColumn("qualiteProjectionIndividuAxe", callUDF("qualiteProjectionIndividuAxe", col("pca"), col("xStdCarre")));
}
/**
* Créer un Dataset Row par des vecteurs denses constitués à partir d'upplets de valeurs.
* @param listeUppletsValeurs liste de n-upplets.
* @return Dataset Dense.
*/
private Dataset<Row> denseRowWithVectors(double[]... listeUppletsValeurs) {
List<Row> data = new ArrayList<>();
for(double[] upplet : listeUppletsValeurs) {
Vector vecteur = Vectors.dense(upplet);
data.add(RowFactory.create(vecteur));
}
StructType schema = new StructType(new StructField[] {
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
return this.session.createDataFrame(data, schema);
}
/**
* Renvoyer un ensemble de valeurs doubles sous la forme d'un tableau.
* @param valeurs Valeurs.
* @return Tableau de doubles.
*/
private double[] d(double... valeurs) {
return valeurs;
}
}
解决方案
推荐阅读
- kubernetes - 查询 Kubernetes 中的远程状态存储(交互式查询)
- swift - Xcode 11 使用 Parse 连接到 Amazon Web Service 时出错
- reactjs - 在 React js 的单页滚动投资组合站点中,我可以在哪里放置登录页面?
- docker - kubectl 版本显示错误的版本号
- python - 在 SeriesGroupBy 上使用集合并集
- flutter - 如何在 Flutter 中使用两个自定义画家?
- c++ - 如何修复“未包含在全局命名空间错误”?
- hangfire - 在 ASP.NET 应用程序中配置 Hangfire 时的问题
- java - ImageIO.write 行为因格式类型而异,即使它们是公认的写入者
- asp.net-core - AspNetCore.Components.Forms.DataAnnotation - Blazor 组件 - 如何跳过对选择属性的验证?