apache-spark - Spark incremental loading overwrite old record
问题描述
I have a requirement to do the incremental loading to a table by using Spark (PySpark)
Here's the example:
Day 1
id | value
-----------
1 | abc
2 | def
Day 2
id | value
-----------
2 | cde
3 | xyz
Expected result
id | value
-----------
1 | abc
2 | cde
3 | xyz
This can be done easily in relational database,
Wondering whether this can be done in Spark or other transformational tool, e.g. Presto?
解决方案
Here you go! First Dataframe:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
Second Dataframe:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
Now join and merge these two datafame using full outer join and use coalesce function while select and can replace the null values wih user defined values.
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
I hope this should solve your problem. :-)
推荐阅读
- php - 如何在 Doctrine 查询中设置结果模式?
- reactjs - 如何将 CORS 功能添加到我的 http-gateway (spring-integration-dsl) 中?
- python - 如何使用 Python 在 Windows 上获取当前光标状态(WaitCursor、Arrow、Default 等)?
- jquery - jQuery Lazy - 一次加载所有图像
- python-3.x - 如何在不使用动画的情况下更新 matplotlib 散点图中的 x 和 y 数据数组?
- mysql - 在 Google BigQuery 中进行聚合时是否可以运行计算
- windows-10 - W10 + cygwin + 期望
- xml - “在 solr 中进行拼写检查后没有返回建议”
- angular - Angular 5 和 router.navigate 的路由问题
- python - 如何使用键值将列表转换为python中的字典?