首页 > 解决方案 > AWS Glue PySpark:将表示为字符串的字典拆分为多行

问题描述

我正在处理大型数据集,其中我的记录具有以下形式

uniqueId col1 col2 col3  Levels
1    A1   A2   A3    {"2019-01-01":1.1 ,"2019-01-02":2.1 ,"2019-01-03":3.1}
2    B1   B2   B3    {"2019-01-01":1.2 ,"2019-01-03":3.2}
3    C1   C2   C3    {"2019-01-04":4.3}

'Levels' 存储为字符串类型。

我正在尝试拆分Levels为行,以便获得如下输出:

uniqueId col1 col2 col3 date        value
1        A1   A2   A3   2019-01-01  1.2
1        A1   A2   A3   2019-01-02  2.1
1        A1   A2   A3   2019-01-03  3.1
2        B1   B2   B3   2019-01-01  1.2
2        B1   B2   B3   2019-01-03  3.2
3        C1   C2   C3   2019-01-04  4.3

我正在尝试按照此处提出的解决方案在 Pyspark 中的 AWS Glue 上应用脚本,

PySpark“爆炸”列中的字典

@udf("map<string, string>")
def parse(s):
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        pass 

parse_udf = udf(parse) 



datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "table", transformation_ctx = "datasource0")

sparkDF = datasource0.toDF() 

sparkDF2 = sparkDF.select("unique_id","col1","col2", "col3", explode(parse("levels")).alias("date", "value"))



GlueDF_tmp = DynamicFrame.fromDF(sparkDF2, glueContext, 'GlueDF_tmp')

GlueDF = GlueDF_tmp.apply_mapping([("unique_id", "string", "unique_id", "string"),
        ("col1", "string", "col1", "string"),
        ("col2", "string", "col2", "string"),
        ("col3", "string", "col3", "string"),
        ("date", "timestamp", "date", "timestamp"),
        ("value", "double", "value", "double")])


glueContext.write_dynamic_frame.from_options(frame = GlueDF, connection_type = "s3", 
     connection_options = {"path": "s3://..."}, 
     format = "parquet", 
     transformation_ctx = "datasink0")

但是我遇到了这种类型的内存问题 AWS Glue - 无法设置 spark.yarn.executor.memoryOverhead

进行拆分的更好/更有效的方法是什么?

标签: pythonapache-sparkpysparkapache-spark-sqlaws-glue

解决方案


推荐阅读