首页 > 解决方案 > 当有多个嵌套字段时,Pyspark Dataframe 连接错误

问题描述

我有一个具有如下架构的数据框:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

我有一个新的数据框,其架构如下:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)

我想加入这些数据框并具有如下结构:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true) 
 |    |-- s5: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

但反过来我在加入后得到数据框,如下所示:root

 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Country: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

应该做什么?我已经完成了 docId 字段的外部连接,上面的数据框是我得到的。

标签: apache-sparkpysparkapache-spark-sql

解决方案


Dataframe 没有“错误地连接”,因为JOIN操作不应该对 Structs 进行排序。您会得到看似重复JOIN的列,因为在组合时会从两个数据框中获取列。您必须明确地进行组合:

初始化

import pyspark
from pyspark.sql import types as T
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

首先,数据(我只添加了一些列供参考,将其扩展到您的完整示例是微不足道的):

Country_schema1 = T.StructField("Country", T.StructType([T.StructField("s1", T.StringType(), nullable=True)]), nullable=True)
Gender_schema1 = T.StructField("Gender", T.StructType([T.StructField("s1", T.StringType(), nullable=True),
                                                      T.StructField("s2", T.StringType(), nullable=True)]))
schema1 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
                       Country_schema1,
                       Gender_schema1
                       ])
data1 = [("1",["1"], ["M", "X"])]

df1 = spark.createDataFrame(data1, schema=schema1)
df1.toJSON().collect()

输出:

['{"docId":"1","Country":{"s1":"1"},"Gender":{"s1":"M","s2":"X"}}']

第二个数据框:

Country_schema2 = T.StructField("Country", T.StructType([T.StructField("s6", T.StringType(), nullable=True)]), nullable=True)
Gender_schema2 = T.StructField("Gender", T.StructType([T.StructField("s6", T.StringType(), nullable=True),
                                                      T.StructField("s7", T.StringType(), nullable=True)]))
schema2 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
                       Country_schema2,
                       Gender_schema2
                       ])
data2 = [("1",["2"], ["F", "Z"])]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.toJSON().collect()

输出:

['{"docId":"1","Country":{"s6":"2"},"Gender":{"s6":"F","s7":"Z"}}']

现在是逻辑。我认为如果使用 SQL 完成,这会更容易。首先创建表:

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

这是要执行的查询。它基本上指示要SELECT编辑哪些字段(而不是所有字段)并将StructFields 中的字段包装在一个新结构中,该结构将它们组合起来:

result = spark.sql("SELECT df1.docID, "
                   "STRUCT(df1.Country.s1 AS s1, df2.Country.s6 AS s6) AS Country, "
                   "STRUCT(df1.Gender.s2 AS s2, df2.Gender.s6 AS s6, df2.Gender.s7 AS s7) AS Gender "
                   "FROM df1 JOIN df2 ON df1.docID=df2.docID")
result.show()

输出:

+-----+-------+---------+
|docID|Country|   Gender|
+-----+-------+---------+
|    1| [1, 2]|[X, F, Z]|
+-----+-------+---------+

在 JSON 中更好地查看:

result.toJSON().collect()

['{"docID":"1","Country":{"s1":"1","s6":"2"},"Gender":{"s2":"X","s6":"F","s7":"Z"}}']

推荐阅读