首页 > 解决方案 > 将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误

问题描述

问题陈述:在升级 Databricks 运行时版本时,重复列在创建数据框时抛出错误。在较低的运行时间中,创建了数据框,并且由于下游不需要重复列,因此它被简单地排除在选择中。

文件位置:存储在 ADLS Gen2 (Azure) 上的 Json 文件。集群模式:标准

代码:我们在 Azure Databricks 中阅读如下。

intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

json 文件是嵌套的,并且在其中一个出现tags的重复列(下图)下。读入数据框后,我们选择所需的列。无论如何,我们不需要这个副本tags

以前我们在 Databricks 运行时 7.3LTS(Spark3.0.1) 上运行,它创建了包含重复列的数据框,但由于我们没有进一步使用它,所以它没有受到伤害。

但是,我们现在正在升级到运行时 9.1LTS(Spark3.1.2),它会在创建数据框本身时抛出关于列重复的错误。错误信息:Found duplicate column(s) in the data schema: `tags`

图片重复列:- json 文件中的重复列:标签。Dataframe 在运行时 7.3LTS(Spark3.0.1) 中成功创建

结论:我一读取数据框就尝试选择列,但没有成功。我有一种预感,因为现在升级后的 Databricks 运行时版本默认情况下更倾向于 Delta 表(增量表不支持其中的重复列),可能有一个属性我们必须关闭才能忽略此检查整个笔记本或只是在读入数据框时。

虽然这个确切的错误发生在 json 上,但我相信其他文件格式(如 csv)如果有重复的列,也可能会发生这种错误。

该文件非常嵌套,并且为所有必需的列定义架构不是很实用,因为它很乏味并且容易出错,以防将来需要更多列(这将是次要解决方案)。文件由供应商使用自动化流程生成,预计所有文件将保持与已交付的历史文件相同的格式。

运行时 9.1LTS(Spark3.1.2) 上的完全错误:

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

编辑:评论预先定义模式。

标签: jsonapache-sparkpysparkdatabricksdelta-lake

解决方案


请使用 json.load 将 json 转换为字典并处理重复键

import json

#test json
test_json = """[
   {"id": 1,
   "tags": "test1",
   "tags": "test1"},
  {"id": 2,
   "tags": "test2",
   "tags": "test2",
   "tags": "test3"}]
"""

#function to handle duplicate keys:
def value_resolver(pairs):
    d = {}
    i=1
    for k, v in pairs:
        if k in d:
           d[k + str(i)] = v
           i+=1
        else:
           d[k] = v
    return d

#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]

#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()


+---+-----+-----+-----+
| id| tags|tags1|tags2|
+---+-----+-----+-----+
|  1|test1|test1| null|
|  2|test2|test2|test3|
+---+-----+-----+-----+

推荐阅读