首页 > 解决方案 > Spring Boot 应用程序使用 Java 执行 REST -> 服务 -> 数据集 (Spark)。我会用 Scala 编写一些数据集。可能吗?

问题描述

我用Java中的Spring Boot编写了一个应用程序。

里面有 REST 调用、Spring 引导业务服务 ( @Service),其中一些服务使用Apache Spark创建数据集,进行一些计算并返回Java POJO 对象列表。

例如,一个Spring Boot服务

@Service
public class AssociationsService

有一个返回关联列表的方法:

/**
 * Obtenir la liste des associations d'une commune.
 * @param session Session Spark.
 * @param anneeRNA Année du Registre National des Associations
 * @param anneeCOG Année du Code Officiel Géographique.
 * @param codeCommune Code Commune.
 * @return Liste des associations.
 */
public List<Association> obtenirAssociations(SparkSession session, int anneeRNA, int anneeCOG, CodeCommune codeCommune) {
   Objects.requireNonNull(codeCommune, "Le code commune où rechercher des associations ne peut pas valoir null.");
   
   LOGGER.info("Obtention des associations présentes en {} pour la commune de code {} du COG {}", anneeRNA, codeCommune, anneeCOG);
   
   Dataset<Row> rowAssociations = this.associationsDataset.rowAssociations(session, anneeRNA, anneeCOG, true, TriAssociations.CODE_COMMUNE)
      .where(col("codeDepartement").equalTo(codeCommune.getNumeroDepartement())
            .and(col("codeCommune").equalTo(codeCommune.getDepartementEtCommune())));
   
   return this.associationsDataset.toAssociations(rowAssociations);
}

其中Association是一个业务对象 ( Java POJO),其成员是String, Integer... 或其他 POJO。

public class Association extends ObjetMetierSpark {
   /** Serial ID.*/
   private static final long serialVersionUID = -7206954677114644630L;

   /** Numéro Waldec national unique de l’association. */
   private NumeroWaldec numeroWaldec;
   
   /** Ancien numéro waldec. */
   private AncienNumeroWaldec ancienNumeroWaldec;
   
   /** Siret de l'association (facultatif). */
   private SIRET siret;
   
   /** Numéro d'association reconnue d'utilité publique (RUP) attribué par le Ministère. */
   private NumeroRUP numeroRUP;

   /** Code du site gestionnaire de l’association */
   private String gestion;

   /** Nature de l'association : simplement déclarée 1901 ou autre */
   private String nature;

   /** Regroupement de l'association : simple ou union ou fédération (S, U, F). */
   private RegroupementAssociation groupement;

   /** Nom long de l'association. */
   private String titre;

   // Getters and setters following
   [...]

并且 service 方法使用Spark和在一个名为的类中实现的数据集AssociationWaldecDataset来完成它的工作:

/**
 * Obtenir un Dataset Row du registre national des associations actives.
 * @param session Session Spark.
 * @param anneeRNA Année du registre.
 * @param anneeCOG Année du Code Officiel Géographique.
 * @param actives true si seules les associations actives doivent être retenues.
 * @param tri Tri à sélectionner.
 * @param verifications Vérifications optionnelles à mener.
 * @return Dataset des associations.
 * @throws TechniqueException si un incident survient.
 */
public Dataset<Row> rowAssociations(SparkSession session, int anneeRNA, int anneeCOG, boolean actives, TriAssociations tri, @SuppressWarnings("unused") Verification... verifications) throws TechniqueException {
   Parqueteur<TriAssociations> parqueteur = new Parqueteur<>(this.tempDir, isCacheUtilise(), "associationsRNA", "{0,number,#0}_{1,number,#0}_{2}", TriAssociations.values());
   Dataset<Row> associations = parqueteur.loadFromStore(session, tri, anneeRNA, anneeCOG, actives);
   
   if (associations != null) {
      return associations;
   }
   
   associations = loadAndRenameAssociationsCSV(session, anneeRNA, anneeCOG, tri);

   // Ejecter les associations qui n'ont ni nom, ni objet social.
   int indexTitre = associations.schema().fieldIndex("titre");
   int indexObjet = associations.schema().fieldIndex("objet");

   LongAccumulator sansObjetSocial = session.sparkContext().longAccumulator("sans_objet_social");
   LongAccumulator rnaInvalide = session.sparkContext().longAccumulator("rna_invalide");
   
   associations = associations.filter((FilterFunction<Row>)f -> {
      if (f.isNullAt(indexTitre) && f.isNullAt(indexObjet)) {
         LOGGER.warn("Une association RNA {} dans la commune de {} ({}) n'a ni titre ni objet et n'est pas retenue.", 
            f.getAs("numero_waldec"), f.getAs("adresse_siege_nom_commune"), f.getAs("adresse_siege_code_commune"));
         
         sansObjetSocial.add(1);
         return false;
      }
      
      if (f.getAs("numero_waldec").toString().length() != 10) {
         LOGGER.warn("Une association RNA {} dans la commune de {} ({}) n'a pas d'identifiant waldec et est écartée.", 
            f.getAs("numero_waldec"), f.getAs("adresse_siege_nom_commune"), f.getAs("adresse_siege_code_commune"));
         
         rnaInvalide.add(1);
      }
      
      // Il peut être demandé que seules les associations actives soient retenues.
      if (actives) {
         if ("A".equals(f.getAs("position_activite")) == false) {
            return false;
         }
      }
      
      return true;
   });

   // Joindre les objets sociaux, en outer.
   Dataset<Row> objetsSociaux = this.objetsSociauxDataset.rowObjetsSociaux(session, anneeRNA);
   
   associations = associations.join(objetsSociaux, associations.col("code_objet_social1").equalTo(objetsSociaux.col("code_objet_social")), "outer")
      .withColumnRenamed("libelle_objet_social", "libelle_objet_social1")
      .drop("code_objet_social");

   associations = associations.join(objetsSociaux, associations.col("code_objet_social2").equalTo(objetsSociaux.col("code_objet_social")), "outer")
      .withColumnRenamed("libelle_objet_social", "libelle_objet_social2")
      .drop("code_objet_social");
   
   // Remplacer les noms de communes par celles officielles.
   Dataset<Row> communes = this.cogDataset.rowCommunes(session, anneeCOG).select("codeCommune", "nomCommune", "codeRegion", "codeDepartement", "codeEPCI", "nomEPCI", "populationTotale");
   
   associations = associations.join(communes, associations.col("adresse_siege_code_commune").equalTo(communes.col("codeCommune")), "inner")
      .drop("adresse_siege_code_commune", "adresse_siege_nom_commune");
   associations = parqueteur.appliquer(tri, associations);

   // Supprimer les caractères parasites majeurs des noms d'associations, objets sociaux, adresses.
   associations = correctionDonneesInvalides(associations, true);
   associations = associations.persist(); // Important : optimise beaucoup.
   
   // Calculer l'ancienneté d'existence des associations qui ont une date de création.
   Column nombreAnneesExistence = when(col("date_creation").equalTo(null), null).
      otherwise(datediff(current_date(), col("date_creation")).$div(lit(365)));
   
   associations = associations.withColumn("nombreAnneesExistence", nombreAnneesExistence);

   // Calculer l'ancienneté de la dernière déclaration quand la date est connue.
   Column nombreAnneesDerniereDeclaration = when(col("date_derniere_declaration").equalTo(null), null).
      otherwise(datediff(current_date(), col("date_derniere_declaration")).$div(lit(365)));
   
   associations = associations.withColumn("nombreAnneesDerniereDeclaration", nombreAnneesDerniereDeclaration);

   if (parqueteur.saveToStore(tri, associations, anneeRNA, anneeCOG, actives)) {
      LOGGER.info("Le dataset des associations inscrites au registre national de {} et année COG {}, tri par {} est constitué et stocké.", anneeRNA, anneeCOG, tri);
   }
   
   if (associations.isEmpty() == false) {
      LOGGER.info("Anomalies rencontrées :");
      LOGGER.info("\t{} RNA étaient mal constitués : enregistrements invalides.", rnaInvalide.count());
      LOGGER.info("\t{} associations étaient sans titre ni objet.", sansObjetSocial.count());
   }

   return associations;
}

但是我写的这种数据集越多,我就越觉得如果这个 Dataset 代码,这个方法rowAssociations,用Scala而不是Java编写会更方便,据说更准确,有更多的处理能力阿帕奇星火

但是您会看到我的应用程序的Spark部分在哪里......在所有Java中的调用流程的末尾。

Rest -> Spring-Boot Service -> Spark
每个人都使用Java业务对象。(其他一些 Spring Boot 服务,对Spark一无所知,可以使用业务对象Association来满足其他需求)。

Maven中,我将构建插件设置为允许src/main/scala编译src/main/java

<build>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>4.4.0</version>
            
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                    
                    <configuration>
                        <recompileMode>all</recompileMode>
                        <sourceDir>src/main/scala</sourceDir>
                    </configuration>
                </execution>
                
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                    
                    <configuration>
                        <recompileMode>all</recompileMode>
                        <testSourceDir>${project.basedir}/src/test/scala</testSourceDir>
                    </configuration>
                </execution>
            </executions>
            
            <configuration>
                <scalaVersion>2.12.10</scalaVersion>
            </configuration>
        </plugin>
    </plugins>
</build>

但我已经把我的src/main/scala空了!

因为我很困扰。
我不知道如果我选择创建一个文件来替换用于实现类的文件,那么我用Java编写的带有POJO的 Spring 引导AssociationsService方法如何 调用Scala数据集类。Association
AssociationWaldecDataset.scalaAssociationWaldecDataset.javaAssociationWaldecDataset

是否有可能或者我是否有可能继续用Java编写我的Spark部分?

标签: javaspringspring-bootscalaapache-spark

解决方案


推荐阅读