python - 在 pySpark 中处理空数组(可选二进制元素 (UTF8) 不是一个组)
问题描述
我在 spark 中有一个类似 json 的结构,如下所示:
>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
根据分区,some-array
可能是 all 的空数组id
。当发生这种情况时,spark 会推断出以下模式:
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string
当然,如果我想读取多个分区,这是一个问题,因为 spark 无法合并模式。我试图手动定义架构,所以应该没有问题
>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
到目前为止一切顺利,但是当我尝试实际收集数据时出现错误:
>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
我不明白为什么这会失败。架构应该没有不兼容的原因。如果您想知道,在不指定架构的情况下收集数据是可行的。
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
编辑
这里有一个 python 中可重现的例子
from pyspark.sql.types import *
myschema = StructType([
StructField('id', StringType())
, StructField( 'some-array'
, ArrayType(StructType([
StructField('array-field-1', StringType())
, StructField('array-field-2', StringType())
])
))
])
path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema
read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!
解决方案
我找到的解决方案是在读取 json 文件时将选项设置dropFieldIfAllNull
为。True
这会导致具有空数组的字段消失,从而使合并模式更容易。
>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string
现在,不需要的类型推断将不适用,并且在读取同一文件的多个分区时,该选项mergeSchema
将能够读取所有文件而不会发生冲突。
推荐阅读
- r - 将带有西班牙语缩写月份名称的字符串转换为日期类
- java - 如何将 java.util.Date 转换为 java.time.LocalDate 并保留日期/时间
- internationalization - 我们如何在 AWS Amplify 中以多级结构定义翻译?
- ios - ERR_DEVICE_LOCKED
- amazon-web-services - 如何判断是否未处理运动流
- python - 定义投资组合类和局部-全局变量错误
- php - laravel 中 yajra 数据表的问题
- e-commerce - 我必须在我的电子商务跟踪代码中引入哪些变量?
- swift - Xcode 12.0 - XCFrameworks 在测试目标中不起作用
- python - 我在访问我的 django 管理模型时遇到问题