apache-spark - Spark SQL - 数据比较
问题描述
将具有相同模式的两个 csv 文件(数百万行)与主键列进行比较并打印出差异的最佳方法是什么。例如 ,
CSV1
Id name zip
1 name1 07112
2 name2 07234
3 name3 10290
CSV2
Id name zip
1 name1 07112
2 name21 07234
4 name4 10290
将修改后的文件 CSV2 与原始数据 CSV1 进行比较,
输出应该是
Id name zip
2 name21 07234 Modified
3 name3 10290 Deleted
4 name4 10290 Added
Spark SQL 的新手,我正在考虑将数据导入 Hive 表,然后运行 Spark SQL 来识别更改。
1)是否有任何行修改方法可用于识别行是否已修改,而不是比较每列中的值?2) 有没有更好的方法可以使用 Spark 或其他 HDFS 工具来实现?
感谢反馈
解决方案
存在许多方法;这是一个可以并行完成的事情:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val origDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c", "d"),
("3", "e", "f")
)).toDF("k", "v1", "v2")
val newDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c2", "d"),
("4", "g", "h")
)).toDF("k", "v1", "v2")
val df1 = origDF.except(newDF) // if k not exists in df2, then deleted
//df1.show(false)
val df2 = newDF.except(origDF) // if k not exists in df1, then added
//df2.show(false)
// if no occurrence in both dfs, then the same
// if k exists in both, then k in df2 = modified
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
val df3 = spark.sql("""SELECT df1.k, df1.v1, df1.v2, "deleted" as operation
FROM df1
WHERE NOT EXISTS (SELECT df2.k
FROM df2
WHERE df2.k = df1.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "added" as operation
FROM df2
WHERE NOT EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "modified" as operation
FROM df2
WHERE EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
""")
df3.show(false)
返回:
+---+---+---+---------+
|k |v1 |v2 |operation|
+---+---+---+---------+
|4 |g |h |added |
|2 |c2 |d |modified |
|3 |e |f |deleted |
+---+---+---+---------+
没那么难,没有标准的实用程序。
推荐阅读
- tensorflow - 即使使用 pip 安装后也没有名为 tensorflow 的模块
- angular - 如何正确使用 dotdotdot 在 Angular 5 中溢出文本?
- javascript - 如何通过nodejs发送加密消息?
- android - ListView/RecyclerView 虚拟化
- javascript - 将上传的文件图片添加到 json 文件中
- spring-boot - 在 SpringBoot 中使用 Mockmvc perform() 方法调用 RestController 时出现空指针异常
- javascript - 在nestjs的过滤器中注入服务
- python - 不同的结果响应scrapy shell
- python - statistics.mean() vs sum()/len() vs np.average() 用于列表列表
- mysql - 在mysql中向另一个用户隐藏一个用户的数据库