首页 > 解决方案 > 在计算 PCA 附带的附加值时更好地利用 Apache Spark ML

问题描述

我想编写一个Apache Spark代码,通过添加有关每个组件的全局汇总值、更高特征值、个人对轴的贡献、个人在轴上的投影质量的信息来完成 PCA 的计算。

有用。并从 50 个特征的向量中计算它们(此处为三个第一个体):

+-------------+-------------------------------------------+------------------------------------------+--------------------+---------------------------------------------+------------------------------------------+
|features     |scaledFeatures                             |pca                                       |xStdCarre           |contributionIndividuelleAxe                  |qualiteProjectionIndividuAxe              |
+-------------+-------------------------------------------+------------------------------------------+--------------------+---------------------------------------------+------------------------------------------+
|[100.0,100.0]|[3.665634766586407,3.156450874585141]      |[-4.823943018707778,-0.36004738290505234] |23.400060365676286  |[0.29741427723298436,0.006561249644731516]   |[0.9944600947215109,0.00553990527848926]  |
|[86.1,70.3]  |[2.711113911293206,0.8064410737434008]     |[-2.4872869831159297,-1.3468070793732898] |8.000485845427955   |[0.07906955024417166,0.09180747147437675]    |[0.773277605373604,0.22672239462639585]   |
|[67.9,69.2]  |[1.461309625945275,0.7194036737122256]     |[-1.5419971620115105,-0.5246067298266516] |2.6529674686309654  |[0.03038957477136533,0.013929481805149522]   |[0.8962624969082515,0.10373750309174866]  |

备注:在流程结束时,“功能”列不再存在。一旦功能标准化,它就会被删除。但我在此示例中添加了此列以使其更具可读性。

我使用一些私有方法进行此计算:

因为我从 ML(来自spark-mllib)开始,我想我错过了它的一些重要功能,这些功能可以让我的代码更清晰、更简单。你能告诉我我应该做些什么来创建一个更好地利用Apache SparkML 可以做的代码吗?

package test;

import static java.lang.Math.sqrt;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.types.DataTypes.*;

import java.io.*;
import java.util.*;

import org.apache.spark.api.java.function.*;
import org.apache.spark.ml.feature.*;
import org.apache.spark.ml.linalg.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.sql.types.*;
import org.junit.jupiter.api.*;
import org.springframework.boot.test.context.*;
import org.springframework.test.context.*;

/**
 * Analyse en composantes principales.
 */
@ActiveProfiles("test")
@SpringBootTest(classes = SparkTestApplicationGestion.class)
public class AnalyseComposantesPrincipalesIT extends AbstractStatsTest implements Serializable {
   /** Serial ID. */
   private static final long serialVersionUID = 7301395393891047373L;

   /**
    * Recherche de composantes principales.
    */
   @Test
   @DisplayName("ACP : le classement des universités américaines")
   public void composantePrincipale() {
      // Créer le dataset des features (des vecteurs) :

      // features
      // [100.0,100.0]
      // [86.1,70.3]
      // [67.9,69.2] 
      // ...
      Dataset<Row> dsHiCiSCi = denseRowWithVectors(
         // Le classement (variables HiCi et SCi des universités américaines).
         d(100, 100), d(86.1, 70.3), d(67.9, 69.2), d(54.0, 65.4), d(65.9, 61.7),
         d(58.4, 50.3), d(56.5, 69.6), d(59.3, 46.5), d(50.8, 54.1), d(46.3, 65.4),
         d(57.9, 63.2), d(54.5, 65.1), d(57.4, 75.9), d(59.3, 64.6), d(56.9, 70.8),
         d(52.4, 74.1), d(52.9, 67.2), d(54.0, 59.8), d(41.3, 67.9), d(41.9, 80.9), 
         d(60.7, 77.1), d(38.5, 68.6), d(40.6, 62.2), d(39.2, 77.6), d(38.5, 63.2),

         d(44.5, 57.6), d(35.5, 38.4), d(39.2, 53.4), d(46.9, 57.0), d(41.3, 53.9),
         d(27.7, 23.2), d(46.9, 62.0), d(48.6, 67.0), d(39.9, 45.7), d(42.6, 42.7),
         d(31.4, 63.1), d(40.6, 53.3), d(46.9, 54.8), d(23.4, 54.2), d(30.6, 38.0),
         d(31.4, 51.0), d(27.7, 56.6), d(45.1, 58.0), d(46.9, 64.2), d(35.5, 48.9),
         d(25.7, 51.7), d(39.9, 44.8), d(24.6, 56.9), d(39.9, 65.6), d(37.1, 52.7));

      long n = dsHiCiSCi.count();
      int p = 2; // Deux variables.
      int nombreComposantes = 2;

      // Afficher les valeurs des universités.
      dsHiCiSCi.show((int)n, false);

      // Centrer réduire les valeurs (Standardizing).

      // scaledFeatures
      // [3.665634766586407,3.156450874585141]
      // [2.711113911293206,0.8064410737434008]
      // [1.461309625945275,0.7194036737122256]
      // ...
      StandardScaler scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
         .setWithStd(true)   // Réduire
         .setWithMean(true); // Centrer

      Dataset<Row> centreReduit = scaler.fit(dsHiCiSCi).transform(dsHiCiSCi).select("scaledFeatures");

      // Faire calculer par ML les composantes principales (nombreComposantes extraites de p variables).
      //
      // scaledFeatures                            pca                                      
      // [3.665634766586407,3.156450874585141]     [-4.823943018707778,-0.36004738290505234]
      // [2.711113911293206,0.8064410737434008]    [-2.4872869831159297,-1.3468070793732898]
      // [1.461309625945275,0.7194036737122256]    [-1.5419971620115105,-0.5246067298266516]
      // ...
      PCA pcaOperationDefinition = new PCA().setInputCol("scaledFeatures").setOutputCol("pca").setK(nombreComposantes);
      Dataset<Row> composantesPrincipalesCalculees = pcaOperationDefinition.fit(centreReduit).transform(centreReduit);
      composantesPrincipalesCalculees.show((int)n, false);

      // Calculer la plus grande valeur propre de chaque composante, sa corrélation avec la variable qu'elle résume, et la qualité globale du résumé qu'elle en fait.
      //
      // valeurPropre         correlationComposanteAvecXi   qualiteResumeComposante
      // 1.5648493047635532   0.8845477106305665            0.7824246523817766
      // 0.3951506952364467   0.44449448547560555           0.19757534761822335
      Dataset<Row> valeursPropresEtQualite = valeurPropreEtQualiteGlobaleResume(pcaOperationDefinition, composantesPrincipalesCalculees.select(col("pca")), n, p);
      valeursPropresEtQualite.show(false);

      // Calculer les contributions individuelles et la qualité de la représentation de chaque individu sur chaque axe.
      //
      // xStdCarre           contributionIndividuelleAxe                  qualiteProjectionIndividuAxe
      // 23.400060365676286  [0.29741427723298436,0.006561249644731516]   [0.9944600947215109,0.00553990527848926]
      // 8.000485845427955   [0.07906955024417166,0.09180747147437675]    [0.773277605373604,0.22672239462639585]
      // 2.6529674686309654  [0.03038957477136533,0.013929481805149522]   [0.8962624969082515,0.10373750309174866]
      // ... 
      Dataset<Row> contributionsEtQualiteProjectionIndividuelles = contributionsEtQuailiteIndividuelle(composantesPrincipalesCalculees, valeursPropresEtQualite, n);
      contributionsEtQualiteProjectionIndividuelles.show((int)n, false);
   }

   /**
    * Renvoyer les valeurs propres (eigen values), la corrélation des composantes aux variables Xi, et la qualité globale du résumé qu'elle en donne.
    * @param pcaOperationDefinition Définition de l'opération d'ACP demandée.
    * @param pca Résultat de l'ACP.
    * @param n Nombre d'éléments dans la matrice de départ.
    * @param p Nombre de variables.
    * @return Dataset contenant les valeurs propres, la corrélation des composantes aux variables Xi, et la qualité globale du résumé de chaque composante.
    */
   private Dataset<Row> valeurPropreEtQualiteGlobaleResume(PCA pcaOperationDefinition, Dataset<Row> pca, long n, int p) {
      int nombreComposantes = pcaOperationDefinition.getK();

      double[] valeursPropres = new double[nombreComposantes];                // Plus grande valeur propre de R pour une composante.
      double[] correlationsComposantesAvecXi = new double[nombreComposantes]; // Corrélation de la composante avec Xi.
      double[] qualiteComposantes = new double[nombreComposantes];            // Qualité globale du résumé proposé par la composante.

      List<Row> enregistrements = new ArrayList<>();
      List<Row> composantesTousIndividusUniversites = pca.select(col("pca")).collectAsList();

      for(Row composantesIndividuUniversite : composantesTousIndividusUniversites) {
         for(int numeroComposante = 1; numeroComposante <= nombreComposantes; numeroComposante ++) {
            double composante = ((DenseVector)composantesIndividuUniversite.get(0)).values()[numeroComposante-1]; // Φ[numero_composante]
            valeursPropres[numeroComposante-1] += composante * composante;
         }
      }

      for(int numeroComposante = 1; numeroComposante <= nombreComposantes; numeroComposante ++) {
         valeursPropres[numeroComposante-1] = valeursPropres[numeroComposante-1] / n;

         // Calcul de la qualité du résumé de la composante : 
         correlationsComposantesAvecXi[numeroComposante-1] = sqrt(valeursPropres[numeroComposante-1]) * (1 / sqrt(p));
         qualiteComposantes[numeroComposante-1] = valeursPropres[numeroComposante-1] / p;
         Row row = RowFactory.create(valeursPropres[numeroComposante-1], correlationsComposantesAvecXi[numeroComposante-1], qualiteComposantes[numeroComposante-1]);
         enregistrements.add(row);
      }

      // Construire le dataset avec le schéma [valeurPropre (double), qualiteResumeComposante (double)]. 
      StructType schema = new StructType(new StructField[] {
         new StructField("valeurPropre", DoubleType, false, Metadata.empty()),
         new StructField("correlationComposanteAvecXi", DoubleType, false, Metadata.empty()),
         new StructField("qualiteResumeComposante", DoubleType, false, Metadata.empty()),
      });

      return this.session.createDataFrame(enregistrements, schema);
   }

   /**
    * Calculer les contributions individuelles par axe et la qualité de la projection pour l'individu.
    * @param acp Individus centrés réduits et ACP.
    * @param valeursPropresEtQualitesComposantes Dataset des valeurs propres et qualité des composantes, par variable.
    * @param n Nombre d'individus.
    * @return Dataset d'ACP complété de la contribution individuelle et de la qualité des composantes.
    */
   private Dataset<Row> contributionsEtQuailiteIndividuelle(Dataset<Row> acp, Dataset<Row> valeursPropresEtQualitesComposantes, long n) {
      this.session.udf().register("xCarresCentresReduitsUDF", new UDF1<DenseVector, Double>() {
         /** Serial ID. */
         private static final long serialVersionUID = 1L;

         /**
          * Renvoyer la somme des x carrés centrés réduits des variables d'un individu.
          * @param individuCentreReduit Individu aux variables centrées réduites.
          */
         @Override
         public Double call(DenseVector individuCentreReduit) {
            double sommeXCarreCentresReduits = 0.0;
            double[] variablesIndividuCentreesReduites = individuCentreReduit.values();

            for(int index=0; index < individuCentreReduit.size(); index ++) {
               sommeXCarreCentresReduits += variablesIndividuCentreesReduites[index] * variablesIndividuCentreesReduites[index];  
            }

            return sommeXCarreCentresReduits;
         }
      }, DoubleType);      

      // Faire la somme, par individu, des carrés des valeurs centrées réduites x* de chaque variable se rapportant à un individu.
      Dataset<Row> ds = acp.withColumn("xStdCarre", callUDF("xCarresCentresReduitsUDF", col("scaledFeatures")));

      // Calculer les contributions individuelles par axe.
      List<Double> listeValeursPropres = valeursPropresEtQualitesComposantes.select("valeurPropre").map((MapFunction<Row, Double>)v -> v.getDouble(0), Encoders.DOUBLE()).collectAsList();      
      DenseVector valeursPropres = (DenseVector)Vectors.dense(listeValeursPropres.stream().mapToDouble(Double::doubleValue).toArray());

      this.session.udf().register("contributionIndividuelleAxe", new UDF1<DenseVector, DenseVector>() {
         /** Serial ID. */
         private static final long serialVersionUID = 1L;

         /**
          * Renvoyer la contribution individuelle d'un individu sur un axe.
          * @param composantes Composantes principales d'un individu.
          */
         @Override
         public DenseVector call(DenseVector composantes) {
            double[] contributions = new double[composantes.size()];

            for(int axe=1; axe <= contributions.length; axe++) {
               double composante = composantes.values()[axe-1];
               double valeurPropre = valeursPropres.values()[axe-1];

               contributions[axe-1] = ((composante * composante) / n) / valeurPropre;
            }

            return (DenseVector)Vectors.dense(contributions);
         }
      }, new VectorUDT());      

      ds = ds.withColumn("contributionIndividuelleAxe", callUDF("contributionIndividuelleAxe", col("pca")));

      // Calculer la qualité de la projection, par individu, sur les axes.
      this.session.udf().register("qualiteProjectionIndividuAxe", new UDF2<DenseVector, Double, DenseVector>() {
         /** Serial ID. */
         private static final long serialVersionUID = 1L;

         /**
          * Renvoyer la qualité de la projection, par individu, sur les axes.
          * @param composantes Individu aux variables centrées réduites.
          * @param xStdCarre valeurs centrées réduites, au carré, cumulées pour toutes les variables associées à l'individu.
          */
         @Override
         public DenseVector call(DenseVector composantes, Double xStdCarre) {
            double[] qualite = new double[composantes.size()];

            for(int axe=1; axe <= qualite.length; axe++) {
               double composante = composantes.values()[axe-1];
               qualite[axe-1] = (composante * composante) / xStdCarre;
            }

            return (DenseVector)Vectors.dense(qualite);
         }
      }, new VectorUDT());      

      return ds.withColumn("qualiteProjectionIndividuAxe", callUDF("qualiteProjectionIndividuAxe", col("pca"), col("xStdCarre")));
   }

   /**
    * Créer un Dataset Row par des vecteurs denses constitués à partir d'upplets de valeurs.
    * @param listeUppletsValeurs liste de n-upplets.
    * @return Dataset Dense.
    */
   private Dataset<Row> denseRowWithVectors(double[]... listeUppletsValeurs) {
      List<Row> data = new ArrayList<>();

      for(double[] upplet : listeUppletsValeurs) {
         Vector vecteur = Vectors.dense(upplet);
         data.add(RowFactory.create(vecteur));
      }

       StructType schema = new StructType(new StructField[] {
         new StructField("features", new VectorUDT(), false, Metadata.empty()),
       });

       return this.session.createDataFrame(data, schema);
   }

   /**
    * Renvoyer un ensemble de valeurs doubles sous la forme d'un tableau.
    * @param valeurs Valeurs.
    * @return Tableau de doubles.
    */
   private double[] d(double... valeurs) {
      return valeurs;
   }
}

标签: javaapache-sparkstatisticsapache-spark-mllib

解决方案


推荐阅读