scala - 加入带有大型 Spark Scala 的 smail Dataframe,通过特征选择删除重复项
问题描述
我有两个数据框。大的有 1900 万行,小的有 180K 行。
假设在大 DF 中我有下一个 cols
+-----------------------------------+------------+--------+
| cuid | Col2 | Col3 |
+-----------------------------------+------------+--------+
| 12654467 | Value 1 | 123 |
| 164687948 | Value 2 | 163 |
| 456132456 | Value 3 | 45 |
+-----------------------------------+------------+--------+
然后在小DF中:
+-----------------------------------+
| cuid |
+-----------------------------------+
| 12654467 |
+-----------------------------------+
并且输出必须是
+-----------------------------------+------------+--------+
| cuid | Col2 | Col3 |
+-----------------------------------+------------+--------+
| 12654467 | Value 1 | 123 |
+-----------------------------------+------------+--------+
我试过下一个
val joinedDF = smallDF.join(largeDF, Seq("cuid"), "inner")
但是在joinedDF 中,我有超过180K 行(600 万行)。有人可以知道如何解决这个问题吗?
UPD - 在大型和小型 DF 中,我有 cuid 重复,但它们在其他列中有不同的值,所以我不能按 cuid 分组。
所以,我有下一个问题 - 我如何使用其他列进行特征选择并删除重复项?
解决方案
这对我来说很完美。
让我们定义我们的输入源:
big.csv
cuid,col2,col3
1,what,bye
2,word,random
3,like,random
4,why,this
5,hello,number
6,bye,train
7,music,word
8,duck,bag
9,car,noise
10,crying,baby
small.csv
cuid
1
4
7
7
编码:
import org.apache.spark.sql.*;
import scala.collection.Seq;
import java.io.Serializable;
public class Foo {
private static final SparkSession spark = new SparkSession
.Builder()
.master("local[*]")
.getOrCreate();
public static void main(String[] args) {
Dataset<BigRow> bigDataset = spark
.read()
.option("header", true)
.csv(".\\resources\\big.csv")
.as(Encoders.bean(BigRow.class));
bigDataset.show();
bigDataset.printSchema();
Dataset<SmallRow> smallDataset = spark
.read()
.option("header", true)
.csv(".\\resources\\small.csv")
.as(Encoders.bean(SmallRow.class));
smallDataset.show();
smallDataset.printSchema();
Dataset<Row> joined = bigDataset
.join(smallDataset, "cuid");
joined.show();
}
private static class SmallRow implements Serializable {
private String cuid;
public SmallRow() {}
public String getCuid() {
return cuid;
}
public void setCuid(String cuid) {
this.cuid = cuid;
}
public SmallRow withCuid(final String cuid) {
this.cuid = cuid;
return this;
}
}
private static class BigRow implements Serializable {
private String cuid;
private String col2;
private String col3;
public BigRow() {}
public String getCuid() {
return cuid;
}
public void setCuid(String cuid) {
this.cuid = cuid;
}
public String getCol2() {
return col2;
}
public void setCol2(String col2) {
this.col2 = col2;
}
public String getCol3() {
return col3;
}
public void setCol3(String col3) {
this.col3 = col3;
}
public BigRow withCuid(final String cuid) {
this.cuid = cuid;
return this;
}
public BigRow withCol2(final String col2) {
this.col2 = col2;
return this;
}
public BigRow withCol3(final String col3) {
this.col3 = col3;
return this;
}
}
}
我的输出是:
+----+-----+----+
|cuid| col2|col3|
+----+-----+----+
| 1| what| bye|
| 4| why|this|
| 7|music|word|
| 7|music|word|
+----+-----+----+
考虑使用distinct
删除重复项
Dataset<Row> noDuplicates = joined
.dropDuplicates("cuid");
noDuplicates.show();
希望这有帮助。托马斯。
推荐阅读
- logging - 如何登录 Cytoscape
- javascript - 我想在 vuejs 中确定当天的活动标签
- python - 是否有一种聪明的方法将序数编码器(基于不同的类别)应用于多个变量?
- assembly - ARM 程序集在没有 .type 宏的情况下无法工作
- python - 帧大小问题 cv2.videocapture
- python - 无法使用 Selenium 包提取属性
- python - 将年份和月份的 Stacked DataFrame 转换为具有日期时间索引的 DataFrame
- python-3.x - 从嵌套字典创建数据框
- excel - 在文件夹中搜索与 Excel 范围内不同字符串匹配的文件
- sql-server - 交叉加入事实和维度