首页 > 解决方案 > 在 pySpark 中处理空数组(可选二进制元素 (UTF8) 不是一个组)

问题描述

我在 spark 中有一个类似 json 的结构,如下所示:

>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

根据分区,some-array可能是 all 的空数组id。当发生这种情况时,spark 会推断出以下模式:

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string

当然,如果我想读取多个分区,这是一个问题,因为 spark 无法合并模式。我试图手动定义架构,所以应该没有问题

>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

到目前为止一切顺利,但是当我尝试实际收集数据时出现错误:

>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group

我不明白为什么这会失败。架构应该没有不兼容的原因。如果您想知道,在不指定架构的情况下收集数据是可行的。

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works

编辑

这里有一个 python 中可重现的例子

from pyspark.sql.types import *


myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])

path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema

read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!

标签: pythonapache-sparkpyspark

解决方案


我找到的解决方案是在读取 json 文件时将选项设置dropFieldIfAllNull为。True这会导致具有空数组的字段消失,从而使合并模式更容易。

>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string

现在,不需要的类型推断将不适用,并且在读取同一文件的多个分区时,该选项mergeSchema将能够读取所有文件而不会发生冲突。


推荐阅读