java - Spring Boot 应用程序使用 Java 执行 REST -> 服务 -> 数据集 (Spark)。我会用 Scala 编写一些数据集。可能吗?
问题描述
我用Java中的Spring Boot编写了一个应用程序。
里面有 REST 调用、Spring 引导业务服务 ( @Service
),其中一些服务使用Apache Spark创建数据集,进行一些计算并返回Java POJO 对象列表。
例如,一个Spring Boot服务
@Service
public class AssociationsService
有一个返回关联列表的方法:
/**
* Obtenir la liste des associations d'une commune.
* @param session Session Spark.
* @param anneeRNA Année du Registre National des Associations
* @param anneeCOG Année du Code Officiel Géographique.
* @param codeCommune Code Commune.
* @return Liste des associations.
*/
public List<Association> obtenirAssociations(SparkSession session, int anneeRNA, int anneeCOG, CodeCommune codeCommune) {
Objects.requireNonNull(codeCommune, "Le code commune où rechercher des associations ne peut pas valoir null.");
LOGGER.info("Obtention des associations présentes en {} pour la commune de code {} du COG {}", anneeRNA, codeCommune, anneeCOG);
Dataset<Row> rowAssociations = this.associationsDataset.rowAssociations(session, anneeRNA, anneeCOG, true, TriAssociations.CODE_COMMUNE)
.where(col("codeDepartement").equalTo(codeCommune.getNumeroDepartement())
.and(col("codeCommune").equalTo(codeCommune.getDepartementEtCommune())));
return this.associationsDataset.toAssociations(rowAssociations);
}
其中Association
是一个业务对象 ( Java POJO),其成员是String
, Integer
... 或其他 POJO。
public class Association extends ObjetMetierSpark {
/** Serial ID.*/
private static final long serialVersionUID = -7206954677114644630L;
/** Numéro Waldec national unique de l’association. */
private NumeroWaldec numeroWaldec;
/** Ancien numéro waldec. */
private AncienNumeroWaldec ancienNumeroWaldec;
/** Siret de l'association (facultatif). */
private SIRET siret;
/** Numéro d'association reconnue d'utilité publique (RUP) attribué par le Ministère. */
private NumeroRUP numeroRUP;
/** Code du site gestionnaire de l’association */
private String gestion;
/** Nature de l'association : simplement déclarée 1901 ou autre */
private String nature;
/** Regroupement de l'association : simple ou union ou fédération (S, U, F). */
private RegroupementAssociation groupement;
/** Nom long de l'association. */
private String titre;
// Getters and setters following
[...]
并且 service 方法使用Spark和在一个名为的类中实现的数据集AssociationWaldecDataset
来完成它的工作:
/**
* Obtenir un Dataset Row du registre national des associations actives.
* @param session Session Spark.
* @param anneeRNA Année du registre.
* @param anneeCOG Année du Code Officiel Géographique.
* @param actives true si seules les associations actives doivent être retenues.
* @param tri Tri à sélectionner.
* @param verifications Vérifications optionnelles à mener.
* @return Dataset des associations.
* @throws TechniqueException si un incident survient.
*/
public Dataset<Row> rowAssociations(SparkSession session, int anneeRNA, int anneeCOG, boolean actives, TriAssociations tri, @SuppressWarnings("unused") Verification... verifications) throws TechniqueException {
Parqueteur<TriAssociations> parqueteur = new Parqueteur<>(this.tempDir, isCacheUtilise(), "associationsRNA", "{0,number,#0}_{1,number,#0}_{2}", TriAssociations.values());
Dataset<Row> associations = parqueteur.loadFromStore(session, tri, anneeRNA, anneeCOG, actives);
if (associations != null) {
return associations;
}
associations = loadAndRenameAssociationsCSV(session, anneeRNA, anneeCOG, tri);
// Ejecter les associations qui n'ont ni nom, ni objet social.
int indexTitre = associations.schema().fieldIndex("titre");
int indexObjet = associations.schema().fieldIndex("objet");
LongAccumulator sansObjetSocial = session.sparkContext().longAccumulator("sans_objet_social");
LongAccumulator rnaInvalide = session.sparkContext().longAccumulator("rna_invalide");
associations = associations.filter((FilterFunction<Row>)f -> {
if (f.isNullAt(indexTitre) && f.isNullAt(indexObjet)) {
LOGGER.warn("Une association RNA {} dans la commune de {} ({}) n'a ni titre ni objet et n'est pas retenue.",
f.getAs("numero_waldec"), f.getAs("adresse_siege_nom_commune"), f.getAs("adresse_siege_code_commune"));
sansObjetSocial.add(1);
return false;
}
if (f.getAs("numero_waldec").toString().length() != 10) {
LOGGER.warn("Une association RNA {} dans la commune de {} ({}) n'a pas d'identifiant waldec et est écartée.",
f.getAs("numero_waldec"), f.getAs("adresse_siege_nom_commune"), f.getAs("adresse_siege_code_commune"));
rnaInvalide.add(1);
}
// Il peut être demandé que seules les associations actives soient retenues.
if (actives) {
if ("A".equals(f.getAs("position_activite")) == false) {
return false;
}
}
return true;
});
// Joindre les objets sociaux, en outer.
Dataset<Row> objetsSociaux = this.objetsSociauxDataset.rowObjetsSociaux(session, anneeRNA);
associations = associations.join(objetsSociaux, associations.col("code_objet_social1").equalTo(objetsSociaux.col("code_objet_social")), "outer")
.withColumnRenamed("libelle_objet_social", "libelle_objet_social1")
.drop("code_objet_social");
associations = associations.join(objetsSociaux, associations.col("code_objet_social2").equalTo(objetsSociaux.col("code_objet_social")), "outer")
.withColumnRenamed("libelle_objet_social", "libelle_objet_social2")
.drop("code_objet_social");
// Remplacer les noms de communes par celles officielles.
Dataset<Row> communes = this.cogDataset.rowCommunes(session, anneeCOG).select("codeCommune", "nomCommune", "codeRegion", "codeDepartement", "codeEPCI", "nomEPCI", "populationTotale");
associations = associations.join(communes, associations.col("adresse_siege_code_commune").equalTo(communes.col("codeCommune")), "inner")
.drop("adresse_siege_code_commune", "adresse_siege_nom_commune");
associations = parqueteur.appliquer(tri, associations);
// Supprimer les caractères parasites majeurs des noms d'associations, objets sociaux, adresses.
associations = correctionDonneesInvalides(associations, true);
associations = associations.persist(); // Important : optimise beaucoup.
// Calculer l'ancienneté d'existence des associations qui ont une date de création.
Column nombreAnneesExistence = when(col("date_creation").equalTo(null), null).
otherwise(datediff(current_date(), col("date_creation")).$div(lit(365)));
associations = associations.withColumn("nombreAnneesExistence", nombreAnneesExistence);
// Calculer l'ancienneté de la dernière déclaration quand la date est connue.
Column nombreAnneesDerniereDeclaration = when(col("date_derniere_declaration").equalTo(null), null).
otherwise(datediff(current_date(), col("date_derniere_declaration")).$div(lit(365)));
associations = associations.withColumn("nombreAnneesDerniereDeclaration", nombreAnneesDerniereDeclaration);
if (parqueteur.saveToStore(tri, associations, anneeRNA, anneeCOG, actives)) {
LOGGER.info("Le dataset des associations inscrites au registre national de {} et année COG {}, tri par {} est constitué et stocké.", anneeRNA, anneeCOG, tri);
}
if (associations.isEmpty() == false) {
LOGGER.info("Anomalies rencontrées :");
LOGGER.info("\t{} RNA étaient mal constitués : enregistrements invalides.", rnaInvalide.count());
LOGGER.info("\t{} associations étaient sans titre ni objet.", sansObjetSocial.count());
}
return associations;
}
但是我写的这种数据集越多,我就越觉得如果这个 Dataset 代码,这个方法rowAssociations
,用Scala而不是Java编写会更方便,据说更准确,有更多的处理能力阿帕奇星火。
但是您会看到我的应用程序的Spark部分在哪里......在所有Java中的调用流程的末尾。
Rest -> Spring-Boot Service -> Spark
每个人都使用Java业务对象。(其他一些 Spring Boot 服务,对Spark一无所知,可以使用业务对象Association
来满足其他需求)。
在Maven中,我将构建插件设置为允许src/main/scala
编译src/main/java
:
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<recompileMode>all</recompileMode>
<sourceDir>src/main/scala</sourceDir>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<recompileMode>all</recompileMode>
<testSourceDir>${project.basedir}/src/test/scala</testSourceDir>
</configuration>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.10</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
但我已经把我的src/main/scala
空了!
因为我很困扰。
我不知道如果我选择创建一个文件来替换用于实现类的文件,那么我用Java编写的带有POJO的 Spring 引导AssociationsService
方法如何
调用Scala数据集类。Association
AssociationWaldecDataset.scala
AssociationWaldecDataset.java
AssociationWaldecDataset
是否有可能或者我是否有可能继续用Java编写我的Spark部分?
解决方案
推荐阅读
- keycloak - 为什么使用内置 Keycloak 组成员映射器映射其他范围信息?
- javascript - 以 html 格式获取帖子响应,但也获取状态
- elasticsearch - _update_by_query 无法更新 ElasticSearch 中的所有文档
- c# - SQLite 互操作文件未使用 MSBuild 任务“清理”复制到输出目录
- sql-server - 事实表的无事实事实和历史
- python - 使用 PyMC3 计算 ODE 参数后验:初始能量错误错误
- sql - 如何根据 SQL Server 中不同表上的时间戳条目获取表中最新的用户更新列值?
- python - 模块“plotly.validators.carpet”没有属性“HoverinfoValidator”
- reporting-services - SSRS 计数和表达式函数
- reporting-services - 有没有办法在 ssrs 中编写 where 子句?