首页 > 解决方案 > 这groupByKey(...) 中的类在其成员中有一个 Map。groupByKey 操作因“不可比较”问题而失败

问题描述

我有一个Entreprise具有原始数据类型的类和另一个类上的 Map:Etablissement它仅由原始数据类型组成。

public class Entreprise implements Comparable<Entreprise> {
   /** Liste des établissements de l'entreprise. */
   private Map<String, Etablissement> etablissements = new HashMap<>();

   /** Sigle de l'entreprise */
   private String sigle;

   /** Nom de naissance */
   private String nomNaissance;

   /** Nom d'usage */
   private String nomUsage;
 ...
   @Override
   public int compareTo(Entreprise o) {
      return getSiren().compareTo(o.getSiren());
   }
}

EntrepriseDataset我创建了这个模式:

public StructType schemaEntreprise() {
   StructType schema = new StructType()
      .add("sigle", StringType, true)
      .add("nomNaissance", StringType, true)
      .add("nomUsage", StringType, true)
       ...

   // Ajouter au Dataset des entreprises la liaison avec les établissements.
   MapType mapEtablissements = new MapType(StringType, this.datasetEtablissement.schemaEtablissement(), true);
   StructField etablissements = new StructField("etablissements", mapEtablissements, true, Metadata.empty());
   schema.add(etablissements);

  return schema;
}

我可以做一些方便的事情joinWith,例如:

Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
    .joinWith(dsEtablissements,
    dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");

这是一些操作的开始,应该引导我到一个对象的数据集,Entreprise它们的Etablissement对象在他们的地图中

Enterprise : {{834935512, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, nombre d'établissements : 1, catégorie entreprise : null (null), catégorie juridique : 1000, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00014, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : LOHIER, Nom d'usage : null, prénom usuel : ROGER, autres prénoms : ROGER, pseudonyme : null, sexe : M, purgée : null, date de création : Dec 26, 2017}
Etablishment : {{83493551200014, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2017-12-26, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : null, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 74, répétition : null, type de voie : BD, libellé de voie : DE LA PORTELETTE, complément d'adresse : 74-78, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}

Enterprise : {{178001111, Activité principale : 84.23Z (NAFRev2), effectif salarié : 41 (2016, employeur : null), active : null, dernier traitement : Jan 17, 2019, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 5}, nombre d'établissements : 3, catégorie entreprise : ETI (2,016), catégorie juridique : 7171, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00016, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : null, Nom d'usage : null, prénom usuel : null, autres prénoms : null, pseudonyme : null, sexe : null, purgée : null, date de création : Jan 1, 1978}
Etablishment : {{17800111100396, Activité principale : 84.23Z (NAFRev2), effectif salarié : 11 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1983-01-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL D'INSTANCE D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP A8, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100743, Activité principale : 84.23Z (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Sep 1, 2008, historisation débutée le Dec 25, 2007, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2007-12-25, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL PARITAIRE BAUX RURAUX, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP 330, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100503, Activité principale : 84.23Z (NAFRev2), effectif salarié : 02 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1982-07-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : CONSEIL DE PRUD'HOMMES D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 9, répétition : null, type de voie : AV, libellé de voie : DU GENERAL LECLERC, complément d'adresse : null, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}

但问题是我无法执行groupByKey

KeyValueGroupedDataset<Entreprise, Tuple2<Entreprise, Etablissement>> dsK = 
ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, Entreprise>) f -> {
      Entreprise entreprise = f._1();
      Etablissement etablissement = f._2();
      entreprise.ajouterEtablissement(etablissement);

      return entreprise;
  }, 
  Encoders.bean(Entreprise.class));

Dataset<Entreprise> dsEntreprisesAvecEtablissements = 
dsK.mapGroups(new MapGroupsFunction<Entreprise, Tuple2<Entreprise, Etablissement>, Entreprise>() {
     @Override
     public Entreprise call(Entreprise key, Iterator<Tuple2<Entreprise, Etablissement>> values) {
        while(values.hasNext()) {
           Etablissement etablissement = values.next()._2();
           key.ajouterEtablissement(etablissement);
        }

        return key;
      }
   },
   Encoders.bean(Entreprise.class));

那个groupByKey(...)(或者更准确地说是对数据集采取的第一个操作dsEntreprisesAvecEtablissements)失败并显示一条消息:

java.lang.IllegalArgumentException: cannot generate compare code for un-comparable type: map<string,struct<activiteArtisanRegistreDesMetiers:string,activitePrincipale:string,anneeValiditeEffectifSalarie:int,cedex:string,cedexSecondaire:string,codeCommune:string,codeCommuneSecondaire:string,codePaysEtranger:string,codePaysEtrangerSecondaire:string,codePostal:string,codePostalSecondaire:string,complementAdresse:string,complementAdresseSecondaire:string,dateCreationEtablissement:string,dateDebutHistorisation:string,dateDernierTraitement:string,denominationEtablissement:string,distributionSpeciale:string,distributionSpecialeSecondaire:string,enseigne1:string,enseigne2:string,enseigne3:string,indiceRepetition:string,indiceRepetitionSecondaire:string,libelleCedex:string,libelleCedexSecondaire:string,libelleVoie:string,libelleVoieSecondaire:string,nomCommune:string,nomCommuneSecondaire:string,nomPaysEtranger:string,nomPaysEtrangerSecondaire:string,nombrePeriodes:int,nomenclatureActivitePrincipale:string,numeroVoie:string,numeroVoieSecondaire:string,siege:boolean,siret:string,trancheEffectifSalarie:string,typeDeVoie:string,typeDeVoieSecondaire:string>>
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.genComp(CodeGenerator.scala:700) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.$anonfun$genComparisons$3(GenerateOrdering.scala:121) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]

我发现没有其他 groupByKey 方法允许提供替代比较方法,即 groupByKey 似乎在这里执行的 equals(Map)。它不认为我的 Java Pojo 目标Encoders.bean(..)Comparable. 所以,最终,我不得不以这个笨拙的代码结束:

Entreprises entreprises = new Entreprises();

List<Tuple2<Entreprise, Etablissement>> tuples = ds.collectAsList();
Iterator<Tuple2<Entreprise, Etablissement>> itTuples = tuples.iterator();

while(itTuples.hasNext()) {
   Tuple2<Entreprise, Etablissement> tuple = itTuples.next();
   Entreprise entreprise = entreprises.get(tuple._1().getSiren());
   Etablissement etablissement = tuple._2();

   if (entreprise == null) {
      entreprise = tuple._1();
      entreprises.add(entreprise);
   }

   entreprise.ajouterEtablissement(etablissement);
}

return entreprises;

但是可能存在更好的方法来更好地解决我的问题。在我的操作结束时我应该做些什么来Dataset<Entreprise>让每个Entreprise人都有一张他们的地图Etablissement

标签: javaapache-spark

解决方案


Spark 不允许比较MapType. 你可以做一些不同的事情。

关于您的代码的重要事项是加入和分组的关键。这两种操作都是一样的。这使事情变得容易得多。

您可以尝试以下方法之一:

  • 将密钥从 更改Enterprisesiren: String。并收集 中的所有Etablissement记录mapGroups。这可能会在Enterprise.
  • 加入前在 Etablissement 流中分组siren,并在mapGroups函数中收集它们。生成的流与Enterprise后跟地图的流连接。

第一个解决方案

Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
                .joinWith(dsEtablissements,
                        dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");

        KeyValueGroupedDataset<String, Tuple2<Entreprise, Etablissement>> dsK = ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, String>)
                value -> value._1.siren, Encoders.STRING());

        dsK.mapGroups((MapGroupsFunction<String, Tuple2<Entreprise, Etablissement>, Entreprise>) (key, values) -> {
            Entreprise e = null;
            while (values.hasNext()) {
                Tuple2<Entreprise, Etablissement> tuple = values.next();
                if (e == null) {
                    e = tuple._1;
                }

                e.ajouterEtablissement(tuple._2);
            }

            return e;
        }, Encoders.bean(Entreprise.class))
                .foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));

方案二 这个方案比较好,因为它优雅地处理了企业中重复键的情况。它也将是有效的,因为它将减少将要加入的记录的数量。

KeyValueGroupedDataset<String, Etablissement> ets = dsEtablissements.groupByKey((MapFunction<Etablissement, String>) value -> value.siren, Encoders.STRING());

    Dataset<EtablissementList> etm = ets.mapGroups((MapGroupsFunction<String, Etablissement, EtablissementList>) (key, values) -> {
        Map<String, Etablissement> map = new HashMap<>();
        while (values.hasNext()) {
            Etablissement etablissement = values.next();
            map.put(etablissement.getId(), etablissement);
        }

        return new EtablissementList(map, key);
    }, Encoders.bean(EtablissementList.class));

    Dataset<Tuple2<Entreprise, EtablissementList>> dx = dsEntreprises.joinWith(etm, dsEntreprises.col("siren").equalTo(etm.col("siren")), "inner");
    Dataset<Entreprise> finalDs = dx.map((MapFunction<Tuple2<Entreprise, EtablissementList>, Entreprise>) value -> {
        value._1.etablissements = value._2.etablissements;
        return value._1;
    }, Encoders.bean(Entreprise.class));

    finalDs.foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));

POJO

public static class EtablissementList {
        private Map<String, Etablissement> etablissements = new ConcurrentHashMap<>();
        private String siren;

        public EtablissementList() {
        }

        public EtablissementList(Map<String, Etablissement> etablissements, String siren) {
            this.etablissements = etablissements;
            this.siren = siren;
        }

        public Map<String, Etablissement> getEtablissements() {
            return etablissements;
        }

        public void setEtablissements(Map<String, Etablissement> etablissements) {
            this.etablissements = etablissements;
        }

        public String getSiren() {
            return siren;
        }

        public void setSiren(String siren) {
            this.siren = siren;
        }
    }

推荐阅读