首页 > 解决方案 > Spark df没有选择嵌套字段作为列名

问题描述

我正在尝试创建一个 Spark df,其中包含字典列表中的顶级和嵌套字段,其中包含与 json 对象的键和值相对应的键和值,并且我在选择嵌套列时遇到问题。

这是我到目前为止所拥有的:

输入是包含 JSON 值的字典列表:

[{
  "uid": 98763,
  "estimatedGrade": {
    "science": 10.03,
    "english": 20.5,
   },
  "actualGrade":  {
    "science": 10.03,
    "english": 20.5,
   }
}]

printed schema:
 |-- uid: long (nullable = true)
 |-- actualGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)
 |-- estimatedGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)

所需的输出:

uid 评估科学 estm.english 行为科学 estGrade.english

*请注意我不需要重命名列,但必须缩短它们以适合一行

到目前为止,这是我的代码:


    #jsons contains list of dict with the json key/values
    df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
    df = self._spark.read.json(df, multiLine=True)
    
    logger.info("Df count: %s", df.count())
    logger.info("Df table schema: %s", df.printSchema())
    
    columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
    
    df.select([column_header for column_header in df.columns if column_header in columns])

我只能选择顶级字段的uid,所以我猜我在选择嵌套值时做错了。

请帮忙。

标签: pythonpython-3.xapache-sparkpyspark

解决方案


df.columns仅返回顶级列名。您可以通过在使用您提供的数据样本创建的 df 上运行它来检查这一点。它返回:['actualGrade', 'estimatedGrade', 'uid']

在这一点上我知道的唯一好方法是迭代 df.schema。递归地,如果该字段是 StructType,则检查该字段的嵌套列。

这里有一些代码可以帮助你。一、导入StructType

from pyspark.sql.types import StructType

然后,设置一些辅助函数。第一个用于递归返回所有列名,包括使用点表示法的嵌套列。第二个辅助函数将列表展平。

def get_schema_field_name(field, parent=None):
  if type(field.dataType) == StructType:
    if parent == None:
      prt = field.name
    else:
      prt = parent+"."+field.name # using dot notation
    res = []
    for i in field.dataType.fields:
      res.append(get_schema_field_name(i, prt))
    return res
  else:
    if parent==None:
      res = field.name
    else:
      res = parent+"."+field.name
    return res

def flatten(S):
  if S == []:
    return S
  if isinstance(S[0], list):
    return flatten(S[0]) + flatten(S[1:])
  return S[:1] + flatten(S[1:])

然后,遍历您的架构并使用上面的方法获取所有列(包括嵌套列)。

column_list = []
for j in df.schema:
  column_list.append(get_schema_field_name(j))
column_list = flatten(column_list)

最后,替换选择语句中的 df.columns 部分。

columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']

df.select([column_header for column_header in column_list if column_header in columns])

在 Databricks 8.2 (Spark 3.1.1) 上测试时工作。

来自 Databricks

我还使用这种方法列出了 Spark 中所有表中的所有列名,因此请随时查看文章以获取更多参考:https ://medium.com/helmes-people/how-to-view-all-databases -databricks-9683b12fee10 中的表和列


推荐阅读