python - Spark df没有选择嵌套字段作为列名
问题描述
我正在尝试创建一个 Spark df,其中包含字典列表中的顶级和嵌套字段,其中包含与 json 对象的键和值相对应的键和值,并且我在选择嵌套列时遇到问题。
这是我到目前为止所拥有的:
输入是包含 JSON 值的字典列表:
[{
"uid": 98763,
"estimatedGrade": {
"science": 10.03,
"english": 20.5,
},
"actualGrade": {
"science": 10.03,
"english": 20.5,
}
}]
printed schema:
|-- uid: long (nullable = true)
|-- actualGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
|-- estimatedGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
所需的输出:
uid | 评估科学 | estm.english | 行为科学 | estGrade.english |
---|---|---|---|---|
值 | 值 | 值 | 值 | 值 |
*请注意我不需要重命名列,但必须缩短它们以适合一行
到目前为止,这是我的代码:
#jsons contains list of dict with the json key/values
df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
df = self._spark.read.json(df, multiLine=True)
logger.info("Df count: %s", df.count())
logger.info("Df table schema: %s", df.printSchema())
columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
df.select([column_header for column_header in df.columns if column_header in columns])
我只能选择顶级字段的uid,所以我猜我在选择嵌套值时做错了。
请帮忙。
解决方案
df.columns仅返回顶级列名。您可以通过在使用您提供的数据样本创建的 df 上运行它来检查这一点。它返回:['actualGrade', 'estimatedGrade', 'uid']
。
在这一点上我知道的唯一好方法是迭代 df.schema。递归地,如果该字段是 StructType,则检查该字段的嵌套列。
这里有一些代码可以帮助你。一、导入StructType
from pyspark.sql.types import StructType
然后,设置一些辅助函数。第一个用于递归返回所有列名,包括使用点表示法的嵌套列。第二个辅助函数将列表展平。
def get_schema_field_name(field, parent=None):
if type(field.dataType) == StructType:
if parent == None:
prt = field.name
else:
prt = parent+"."+field.name # using dot notation
res = []
for i in field.dataType.fields:
res.append(get_schema_field_name(i, prt))
return res
else:
if parent==None:
res = field.name
else:
res = parent+"."+field.name
return res
def flatten(S):
if S == []:
return S
if isinstance(S[0], list):
return flatten(S[0]) + flatten(S[1:])
return S[:1] + flatten(S[1:])
然后,遍历您的架构并使用上面的方法获取所有列(包括嵌套列)。
column_list = []
for j in df.schema:
column_list.append(get_schema_field_name(j))
column_list = flatten(column_list)
最后,替换选择语句中的 df.columns 部分。
columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
df.select([column_header for column_header in column_list if column_header in columns])
在 Databricks 8.2 (Spark 3.1.1) 上测试时工作。
我还使用这种方法列出了 Spark 中所有表中的所有列名,因此请随时查看文章以获取更多参考:https ://medium.com/helmes-people/how-to-view-all-databases -databricks-9683b12fee10 中的表和列
推荐阅读
- google-apps-script - 如何解决此错误“异常:无效参数:替换”?
- python - Bokeh - 在不同的图中绘制不同的颜色
- json2html - 避免使用数值引用来访问 json2html 中属于对象数组的字段值
- java - 计算库精细
- python - 未找到 AWS Glue 和模块(configparser)
- powershell - 从 powershell 运行两个 bat 文件并分别返回它们的输出
- ios - IOS 设备上的 Testcafe typeText 在第三方 iframe 上不起作用
- linux - 使用 awk 或 sed 删除 .csv 中第四列和第五列的第一个字符
- layout - Get rid of Vaadin Grid Pro minimum size
- amazon-web-services - 如何在 Json 和 Map 之间进行转换
在