apache-spark - 根据条件pyspark使用其他列值覆盖列值
问题描述
我有一个data frame
像pyspark
下面这样的。
df.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
+-----------+------------+-------------+
我有另一个数据框,如下所示
df1.show()
+-----------+------------+
|customer_id|product_name|
+-----------+------------+
| 12870946| GS748TS|
| 815518| MA402|
| 3138420| null|
| 3178864| WGR614v6|
| 7456796| XE102|
| 21893468| AGM731F|
| null| AE171|
+-----------+------------+
现在我想fuller outer join
对这些表进行操作并更新product_name
如下所示的列值。
1) Overwrite the values in `df` using values in `df1` if there are values in `df1`.
2) if there are `null` values or `no` values in `df1` then leave the values in `df` as they are
expected result
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| GS748TS| Poland|
| 815518| MA402|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
我做了如下
import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer').select(df.customer_id, f.coalesce(df.product_name, df1.product_name).alias('product_name'), df.country)
但是我得到的结果是不同的
df2.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
我怎样才能得到expected result
解决方案
您编写的代码为我生成了正确的输出,因此我无法重现您的问题。我看过其他帖子,其中在执行连接时使用别名已经解决了问题,所以这里有一个稍微修改过的代码版本,它会做同样的事情:
import pyspark.sql.functions as f
df.alias("r").join(df1.alias("l"), on="customer_id", how='full_outer')\
.select(
"customer_id",
f.coalesce("r.product_name", "l.product_name").alias('product_name'),
"country"
)\
.show()
#+-----------+------------+-------------+
#|customer_id|product_name| country|
#+-----------+------------+-------------+
#| 7456796| XE102|United States|
#| 3178864| WGR614v6|United States|
#| null| AE171| null|
#| 815518| MA401|United States|
#| 3138420| WG111v2| UK|
#| 12870946| GS748TS| Poland|
#| 21893468| AGM731F|United States|
#+-----------+------------+-------------+
当我运行您的代码时,我也得到了相同的结果(转载如下):
df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
.select(
df.customer_id,
f.coalesce(df.product_name, df1.product_name).alias('product_name'),
df.country
)\
.show()
我正在使用 spark 2.1 和 python 2.7.13。
推荐阅读
- flutter - 通过颤动中的导航保持对话框
- sql - MSSQL中每个ID的不同序列号
- c++ - 使用 PAHO c / cpp 在代理后面使用 MQTT
- python - 使用 Flask 将数据从 React Form 拉入 Json
- reactjs - React 中 next.js 中基于角色的路由
- vba - 如何获取 visio 形状中连接行的名称属性
- php - 将带有库存的变体设置为 Woocommerce 可变产品中的默认变体
- r - 使用方法出现问题?如何消除使用方法中的错误?
- django - django digitalocean deploy 无法提供静态文件
- reactjs - 如何在状态下发送新数量?