首页 > 解决方案 > 如何在pyspark中连接不同的地图类型

问题描述

我有不同的地图类型,如下所示:

MapType(StringType(), StringType())
MapType(StringType(), IntegerType())
MapType(StringType(), DoubleType())

我怎样才能将其合并为一个并保持其类型完整?

标签: pysparkapache-spark-sql

解决方案


您可以连接maptype具有不同键和值类型的列。但是 post concat spark 将映射键/值类型转换为它找到的最高类型。

例如-如果您考虑具有以下类型的 3 列 resp.-

col1 - MapType(StringType(), StringType())
col2 - MapType(StringType(), IntegerType())
col3 - MapType(StringType(), DoubleType())

输出将map_concat如下 -

map_concat(col1, col2, col3) - MapType(StringType(), StringType())

由于 spark 找到StringType键和值的最高类型。

现在,

为什么 spark 不能保持键值对的原始类型不变?

Ans- Spark 将 MapType 存储为backed by 2 ArrayData

class ArrayBasedMapData(val keyArray: ArrayData, val valueArray: ArrayData) extends MapData {
...
}

& ArrayData 无法处理异构类型。因此 spark 不能保持其原始类型在连接后保持不变。

工作示例供参考

 val df = spark.sql("select map('a', 'b') as col1, map('c', cast(1 as int)) as col2, " +
      "map(1, cast(2.2 as double)) as col3")
    df.printSchema()
    df.show(false)
    /**
      * root
      * |-- col1: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: string (valueContainsNull = false)
      * |-- col2: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: integer (valueContainsNull = false)
      * |-- col3: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: double (valueContainsNull = false)
      *
      * +--------+--------+----------+
      * |col1    |col2    |col3      |
      * +--------+--------+----------+
      * |[a -> b]|[c -> 1]|[d -> 2.2]|
      * +--------+--------+----------+
      */

    val p = df.withColumn("new_col", map_concat($"col1", $"col2", $"col3"))
    p.printSchema()
    p.show(false)

    /**
      * root
      * |-- col1: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: string (valueContainsNull = false)
      * |-- col2: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: integer (valueContainsNull = false)
      * |-- col3: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: double (valueContainsNull = false)
      * |-- new_col: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: string (valueContainsNull = false)
      *
      * +--------+--------+----------+--------------------------+
      * |col1    |col2    |col3      |new_col                   |
      * +--------+--------+----------+--------------------------+
      * |[a -> b]|[c -> 1]|[d -> 2.2]|[a -> b, c -> 1, d -> 2.2]|
      * +--------+--------+----------+--------------------------+
      */

更新-1

Use struct to combine columns into one

 val x = df.withColumn("x", struct($"col1", $"col2", $"col3"))
      x.printSchema()
    x.selectExpr("x.col1['a']", "x.col2['c']", "x.col3['d']").printSchema()

    /**
      * root
      * |-- col1: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: string (valueContainsNull = false)
      * |-- col2: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: integer (valueContainsNull = false)
      * |-- col3: map (nullable = false)
      * |    |-- key: integer
      * |    |-- value: double (valueContainsNull = false)
      * |-- x: struct (nullable = false)
      * |    |-- col1: map (nullable = false)
      * |    |    |-- key: string
      * |    |    |-- value: string (valueContainsNull = false)
      * |    |-- col2: map (nullable = false)
      * |    |    |-- key: string
      * |    |    |-- value: integer (valueContainsNull = false)
      * |    |-- col3: map (nullable = false)
      * |    |    |-- key: integer
      * |    |    |-- value: double (valueContainsNull = false)
      *
      * root
      * |-- x.col1 AS `col1`[a]: string (nullable = true)
      * |-- x.col2 AS `col2`[c]: integer (nullable = true)
      * |-- x.col3 AS `col3`[CAST(d AS INT)]: double (nullable = true)
      */

推荐阅读