首页 > 解决方案 > 将列作为结构数组的 Spark 转储到镶木地板

问题描述

我需要加载一个 csv 文件,该文件具有一个包含结构数组的列,并将其转储到镶木地板格式的另一个位置。我的csv文件有两列,A列和B列。B列的数据类型是array<struct<x: bigint, y:bigint>>

我尝试使用如下模式加载 csv 文件:

val schemaB = ArrayType(StructType(Seq(StructField("x",LongType),StructField("y",LongType))))
val schema = new StructType().add("A",StringType).add("B",schemaB)
spark.read.option("sep", "\t").schema(schema).csv(<location>)

然而,这并没有奏效。我收到以下错误:

org.apache.spark.sql.AnalysisException: CSV data source does not support array<struct<x:bigint,y:bigint>&gt; data type.;</struct<x:bigint,y:bigint>

我什至尝试转换为所需的类型,但这没有用。

这是 B 列外观的示例:

|B                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------+
|68222:102332,21215:1000,10982:70330,|
|93302:13320,263721:902615,9382:100020,|

标签: scalaapache-sparkparquet

解决方案


transform如果您使用的是最新版本的 spark即 2.4+ ,则可以使用函数

首先读取为字符串,split通过","获取列表,然后再次split通过":"获取xy

val schema = new StructType().add("A",StringType).add("B",StringType)
val df = spark.read.option("delimiter", "\t").schema(schema).csv("path to csv")
val splitExpr =  expr("transform(split(B, ','), x -> (split(x, ':')[0] as x, split(x, ':')[1] as y))")

val result = df.select($"A", splitExpr.cast("array<struct<x: long, y:long>>") as "B" )

现在,您可以将其保存在镶木地板中如果您使用的是旧版本的 spark,那么您需要编写一个 udf 最终模式:

root
 |-- A: string (nullable = true)
 |-- B: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

推荐阅读