首页 > 解决方案 > 创建数据框特定模式:以大写字母开头的 StructField

问题描述

为看似简单的好奇心而为冗长的帖子道歉,但我想提供完整的上下文......

在 Databricks 中,我正在基于特定架构定义创建数据“行”,然后将该行插入到空数据框中(也基于相同的特定架构)。

架构定义如下所示:

myschema_xb = StructType(
  [
    StructField("_xmlns", StringType(), True),
    StructField("_Version", DoubleType(), True),
    StructField("MyIds",
      ArrayType(
        StructType(
          [
            StructField("_ID", StringType(), True),
            StructField("_ID_Context", StringType(), True),
            StructField("_Type", LongType(), True),
          ]
        ),
        True
      ),
      True
    ),
  ]
)

因此,行条目是:

myRow = Row(
    _xmlns="http://some.where.com",
    _Version=12.3,
    MyIds=[
        Row(
          _ID="XY",
          _ID_Context="Exxwhy",
          _Type=9
        ),
        Row(
          _ID="9152",
          _ID_Context="LNUMB",
          _Type=21
        ),
    ]
)

最后,databricks 笔记本代码是:

mydf = spark.createDataFrame(sc.emptyRDD(), myschema_xb)
rows = [myRow]
rdf = spark.createDataFrame(rows, myschema_xb)
appended = mydf.union(rdf)

调用rdf = spark.createDataFrame(rows, myschema_xb)导致异常:

ValueError: Unexpected tuple 'h' with StructType.

现在我很好奇的部分是如果我将元素更改MyIdsmyIds(即第一个字母小写),代码有效,并且我的新数据框(appended)具有单行数据。

这个异常是什么意思?为什么当我更改元素的大小写时它会消失?

(仅供参考,我们的 databricks 运行时环境是 Scala 2.11)

谢谢。

标签: pythonpysparkschemaazure-databrickspyspark-dataframes

解决方案


问题应该来自文档中的 Row 对象如何对键/字段进行排序:

Row 可用于通过使用命名参数创建行对象,字段将按名称排序。

myschema_xb中,三列是按顺序定义的[_xmlns, _Version, MyIds]。当您使用键定义 myRow:(_xmlns, _Version, MyIds)时,生成的实际 Row 对象将是:

Row(MyIds=[Row(_ID='XY', _ID_Context='Exxwhy', _Type=9), Row(_ID='9152', _ID_Context='LNUMB', _Type=21)], _Version=12.3, _xmlns='http://some.where.com')

MyIds移至第一列,这与架构不匹配,因此产生错误。而当您使用小写的 column-namemyIds时,Row 对象中的键将按照右列中的键进行排序,但是['_Version', '_xmlns', 'myIds']并切换。这不会产生错误,因为简单数据类型可以通过类型转换,但生成的数据帧不正确。myIds_Version_xmls

为了克服这个问题,您应该设置一个类似 Row 的类并自定义键的顺序,以确保字段的顺序与您的架构中显示的完全匹配:

from pyspark.sql import Row

MyOuterROW = Row('_xmlns', '_Version', 'MyIds')
MyInnerRow = Row('_ID', '_ID_Context', '_Type')

myRow = MyOuterROW( 
    "http://some.where.com", 
    12.3, 
    [ 
        MyInnerROW("XY", "Exxwhy", 9), 
        MyInnerROW("9152", "LNUMB", 21) 
    ] 
)              
print(myRow)
#Row(_xmlns='http://some.where.com', _Version=12.3, MyIds=[Row(_ID='XY', _ID_Context='Exxwhy', _Type=9), Row(_ID='9152', _ID_Context='LNUMB', _Type=21)])

rdf = spark.createDataFrame([myRow], schema=myschema_xb)

推荐阅读