首页 > 解决方案 > 加入带有大型 Spark Scala 的 smail Dataframe,通过特征选择删除重复项

问题描述

我有两个数据框。大的有 1900 万行,小的有 180K 行。

假设在大 DF 中我有下一个 cols

+-----------------------------------+------------+--------+
|               cuid                |    Col2    | Col3   |
+-----------------------------------+------------+--------+
| 12654467                          | Value 1    |    123 |
| 164687948                         | Value 2    |    163 |
| 456132456                         | Value 3    |     45 |
+-----------------------------------+------------+--------+

然后在小DF中:

+-----------------------------------+
|               cuid                |   
+-----------------------------------+
| 12654467                          |
+-----------------------------------+

并且输出必须是

+-----------------------------------+------------+--------+
|               cuid                |    Col2    | Col3   |
+-----------------------------------+------------+--------+
| 12654467                          | Value 1    |    123 |
+-----------------------------------+------------+--------+

我试过下一个

val joinedDF = smallDF.join(largeDF, Seq("cuid"), "inner")

但是在joinedDF 中,我有超过180K 行(600 万行)。有人可以知道如何解决这个问题吗?

UPD - 在大型和小型 DF 中,我有 cuid 重复,但它们在其他列中有不同的值,所以我不能按 cuid 分组。

所以,我有下一个问题 - 我如何使用其他列进行特征选择并删除重复项?

标签: scalaapache-sparkapache-spark-sql

解决方案


这对我来说很完美。

让我们定义我们的输入源:

  • big.csv
cuid,col2,col3
1,what,bye
2,word,random
3,like,random
4,why,this
5,hello,number
6,bye,train
7,music,word
8,duck,bag
9,car,noise
10,crying,baby
  • small.csv
cuid
1
4
7
7

编码:

import org.apache.spark.sql.*;
import scala.collection.Seq;

import java.io.Serializable;

public class Foo {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {
        Dataset<BigRow> bigDataset = spark
                .read()
                .option("header", true)
                .csv(".\\resources\\big.csv")
                .as(Encoders.bean(BigRow.class));
        bigDataset.show();
        bigDataset.printSchema();
        Dataset<SmallRow> smallDataset  = spark
                .read()
                .option("header", true)
                .csv(".\\resources\\small.csv")
                .as(Encoders.bean(SmallRow.class));
        smallDataset.show();
        smallDataset.printSchema();


        Dataset<Row> joined = bigDataset
                .join(smallDataset, "cuid");
        joined.show();
    }

    private static class SmallRow implements Serializable {
        private String cuid;

        public SmallRow() {}

        public String getCuid() {
            return cuid;
        }

        public void setCuid(String cuid) {
            this.cuid = cuid;
        }

        public SmallRow withCuid(final String cuid) {
            this.cuid = cuid;
            return this;
        }
    }

    private static class BigRow implements Serializable {
        private String cuid;
        private String col2;
        private String col3;

        public BigRow() {}

        public String getCuid() {
            return cuid;
        }

        public void setCuid(String cuid) {
            this.cuid = cuid;
        }

        public String getCol2() {
            return col2;
        }

        public void setCol2(String col2) {
            this.col2 = col2;
        }

        public String getCol3() {
            return col3;
        }

        public void setCol3(String col3) {
            this.col3 = col3;
        }

        public BigRow withCuid(final String cuid) {
            this.cuid = cuid;
            return this;
        }

        public BigRow withCol2(final String col2) {
            this.col2 = col2;
            return this;
        }

        public BigRow withCol3(final String col3) {
            this.col3 = col3;
            return this;
        }
    }

}

我的输出是:

+----+-----+----+
|cuid| col2|col3|
+----+-----+----+
|   1| what| bye|
|   4|  why|this|
|   7|music|word|
|   7|music|word|
+----+-----+----+

考虑使用distinct删除重复项

Dataset<Row> noDuplicates = joined
                .dropDuplicates("cuid");
        noDuplicates.show();

希望这有帮助。托马斯。


推荐阅读