首页 > 解决方案 > 创建映射,同时保留 pyspark 中多列的键和值的数据类型

问题描述

使用 PySpark 2.4.6

create_map在 Pyspark 中使用或for cols创建地图列时map_concat,列的数据类型默认转换为 StringType()。有没有办法在生成这个新的地图列时保留数据类型

例如:

Original dataframe (df)
df = df.withColumn("map_col", create_map(lit("col1"),col("col1"),
            lit("col2"), col("col2").cast(IntegerType()),
            lit("col3"), col("col3").cast(IntegerType())
            ))

col0 | col1 | col2 | col3| map_col
  s1 | Hello| 25   | 56  | [col1 -> Hello, col2 -> 25, col3 -> 56]
  s2 | Foo  | 33   | 62  | [col1 -> Foo, col2 -> 33, col3 -> 62]
  s3 | Bar  | 44   | 102 | [col1 -> Bar, col2 -> 44, col3 -> 102]

上面的示例df为 col("map_col") 创建了一个具有以下模式的

|-- map_col: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

并在分组时产生 JSON 结构,例如["col1":"Hello","col2":"25","col3":"56"]第一行col0

但是,我有兴趣从此地图列生成以下 JSON 有效负载

["col1":"Hello","col2":25,"col3":56]

我怎样才能做到这一点?

到目前为止,我已经尝试MapType(StringType(), IntegerType())为一列和 MapType(StringType(), StringType())另一列创建单独的地图列,并将其组合使用map_concatstruct('col1', 'col2')但它们都没有得到我想要的有效负载。

我也尝试使用udf下面的示例,但被迫使用MapType(StringType(), StringType())以防止错误数据类型不匹配。

from pyspark.sql.functions import (
    to_json,
    col,
    collect_list,
    struct,
    create_map,
    lit,
    udf,
    map_concat,
    concat,
)

def create_dict(row):
    ''' Returns a dict json object for a dataframe row '''
    if row:
        return(json.loads(row))
    else:
        return None
udf_create_dict = udf(lambda row: create_dict(row), MapType(StringType(),StringType()))

 where 

df = df.select(struct(col("col1"), col("col2"), col("col3")).cast(schema).alias("inter1")\
.withColumn("inter2", to_json('inter1'))\
.withColumn("map_col", udf_create_dict(col("inter2"))).drop("inter2")

schema = StructType(
        [   
            StructField("col1", StringType()),
            StructField("col2", IntegerType()),
            StructField("col3", IntegerType()),
        ]
    )

标签: dataframepyspark

解决方案


推荐阅读