python - Python Spark 根据主键查找列差异
问题描述
我有一个DF1,
df1 = sc.parallelize([(1, "book1", 1), (2, "book2", 2), (3, "book3", 3), (4, "book4", 4)]).toDF(["primary_key", "book", "number"])
和DF2,
df2 = sc.parallelize([(1, "book1", 1), (2, "book8", 8), (3, "book3", 7), (5, "book5", 5)]).toDF(["primary_key", "book", "number"])
from pyspark.sql import functions
columlist = sc.parallelize(["book", "number"])
结果将是,(垂直方式)
[![enter image description here][3]][3]
我如何在 python spark 中实现这一点?
解决方案
这是一个PySpark
解决方案。请注意,我必须转换number
为String
,因为我们不能有两个不同datatypes
的列dataframe1
,dataframe2
因此DataFrame
-
from pyspark.sql.functions import explode, array, struct, lit, col
df1 = sc.parallelize([(1, "book1", 1), (2, "book2", 2), (3, "book3", 3), (4, "book4", 4)]).toDF(["primary_key", "book", "number"])
df1.show()
+-----------+-----+------+
|primary_key| book|number|
+-----------+-----+------+
| 1|book1| 1|
| 2|book2| 2|
| 3|book3| 3|
| 4|book4| 4|
+-----------+-----+------+
df2 = sc.parallelize([(1, "book1", 1), (2, "book8", 8), (3, "book3", 7), (5, "book5", 5)]).toDF(["primary_key", "book", "number"])
df2.show()
+-----------+-----+------+
|primary_key| book|number|
+-----------+-----+------+
| 1|book1| 1|
| 2|book8| 8|
| 3|book3| 7|
| 5|book5| 5|
+-----------+-----+------+
def to_transpose(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
df1_trans = to_transpose(df1.withColumn('number',col('number').cast('string')), ["primary_key"])\
.withColumnRenamed("val","dataframe1")\
.withColumnRenamed("key","diff_column_name")
df2_trans=to_transpose(df2.withColumn('number',col('number').cast('string')), ["primary_key"])\
.withColumnRenamed("val","dataframe2")\
.withColumnRenamed("key","diff_column_name")
df = df1_trans.join(df2_trans, ['primary_key','diff_column_name'], how='full')
df = df.where((col('dataframe1')!= col('dataframe2'))
| (col('dataframe1').isNotNull() & col('dataframe2').isNull())
| (col('dataframe2').isNotNull() & col('dataframe1').isNull())).sort('primary_key')
df = df.show()
+-----------+----------------+----------+----------+
|primary_key|diff_column_name|dataframe1|dataframe2|
+-----------+----------------+----------+----------+
| 2| book| book2| book8|
| 2| number| 2| 8|
| 3| number| 3| 7|
| 4| book| book4| null|
| 4| number| 4| null|
| 5| book| null| book5|
| 5| number| null| 5|
+-----------+----------------+----------+----------+
推荐阅读
- javascript - 制作一个适用于所有文本区域的 Summernote,无需重复,
- jquery - jquery ui选项卡除了第一个选项卡外不显示任何内容
- spring - 如何使用 Spring MVC 处理 MaxUploadSizeExceededException
- amazon-web-services - Helm 升级引发 YAML 解析器错误
- java - 通过使用 . 和
- reactjs - React Logo 更改不适用于带有本地 href 的链接
- javascript - 每次用户刷新时,聊天都会显示用户断开连接
- python-3.x - 用于抓取文章的报纸 api
- php - 使用 php/pdo 连接到远程 wpengine mysql 服务
- python - 在某些情况下,将 collections.namedtuple 与 ProcessPoolExecutor 一起使用会卡住