json - 将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误
问题描述
问题陈述:在升级 Databricks 运行时版本时,重复列在创建数据框时抛出错误。在较低的运行时间中,创建了数据框,并且由于下游不需要重复列,因此它被简单地排除在选择中。
文件位置:存储在 ADLS Gen2 (Azure) 上的 Json 文件。集群模式:标准
代码:我们在 Azure Databricks 中阅读如下。
intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
json 文件是嵌套的,并且在其中一个出现tags
的重复列(下图)下。读入数据框后,我们选择所需的列。无论如何,我们不需要这个副本tags
。
以前我们在 Databricks 运行时 7.3LTS(Spark3.0.1) 上运行,它创建了包含重复列的数据框,但由于我们没有进一步使用它,所以它没有受到伤害。
但是,我们现在正在升级到运行时 9.1LTS(Spark3.1.2),它会在创建数据框本身时抛出关于列重复的错误。错误信息:Found duplicate column(s) in the data schema: `tags`
图片重复列:- json 文件中的重复列:标签。Dataframe 在运行时 7.3LTS(Spark3.0.1) 中成功创建
结论:我一读取数据框就尝试选择列,但没有成功。我有一种预感,因为现在升级后的 Databricks 运行时版本默认情况下更倾向于 Delta 表(增量表不支持其中的重复列),可能有一个属性我们必须关闭才能忽略此检查整个笔记本或只是在读入数据框时。
虽然这个确切的错误发生在 json 上,但我相信其他文件格式(如 csv)如果有重复的列,也可能会发生这种错误。
该文件非常嵌套,并且为所有必需的列定义架构不是很实用,因为它很乏味并且容易出错,以防将来需要更多列(这将是次要解决方案)。文件由供应商使用自动化流程生成,预计所有文件将保持与已交付的历史文件相同的格式。
运行时 9.1LTS(Spark3.1.2) 上的完全错误:
AnalysisException Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
370 path = [path]
371 if type(path) == list:
--> 372 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
373 elif isinstance(path, RDD):
374 def func(iterator):
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: Found duplicate column(s) in the data schema: `tags`
编辑:评论预先定义模式。
解决方案
请使用 json.load 将 json 转换为字典并处理重复键
import json
#test json
test_json = """[
{"id": 1,
"tags": "test1",
"tags": "test1"},
{"id": 2,
"tags": "test2",
"tags": "test2",
"tags": "test3"}]
"""
#function to handle duplicate keys:
def value_resolver(pairs):
d = {}
i=1
for k, v in pairs:
if k in d:
d[k + str(i)] = v
i+=1
else:
d[k] = v
return d
#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]
#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()
+---+-----+-----+-----+
| id| tags|tags1|tags2|
+---+-----+-----+-----+
| 1|test1|test1| null|
| 2|test2|test2|test3|
+---+-----+-----+-----+
推荐阅读
- javascript - 从对象中删除属性,同时在 Javascript / TypeScript 中保持原始对象不变
- html - 图像最大宽度响应和自动
- javascript - 如何在子类中继承父类字典中的键
- intellij-idea - 在idea中创建groovy脚本失败
- r - 使用另一个数据框和 grepl 在数据框中搜索字符串
- jmeter - 如何使用非 gui JMeter 获取多个输出文件?
- openldap - 无法使用 ldapsearch 获取 userPassword
- reactjs - 通过改变子组件React js的状态来隐藏父组件的按钮
- regex - 环顾 Markdown 正则表达式
- python - 在 Pytorch 中定义数据加载器时使用类有什么好处?