首页 > 解决方案 > Pyspark:使用 google bigquery 连接器使用 aws 胶水作业将嵌套数据插入 bigquery 时出现问题

问题描述

我在使用 google bigquery 连接器使用 aws 胶水作业将嵌套数据插入 bigquery 时遇到问题

下面是我的大查询表方案

competition FLOAT   NULLABLE    
categories  RECORD  REPEATED    
    id  INTEGER REQUIRED    

在 aws 胶水的自定义转换中,我试图将 python dict 列表发送到如下类别:

[{"id":10004},{"id":10009},{"id":10301}]

我的数据框的架构类似于

root
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = false)

我的代码

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    
    from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType, StructField
    from pyspark.sql.functions import udf
    
    newdf = dfc.select(list(dfc.keys())[0]).toDF()
    
    newdf = newdf.select("keyword", "spell","spell_type", \
        "keyword_info.competition", "keyword_info.cpc", \
            "keyword_info.search_volume", "keyword_info.categories");
    newdf = newdf.withColumnRenamed('search_volume', 'volume')
    udf_history_modification("history"))
    
    udf_categories_modification = udf(lambda x: [{'id': id} for id in x], \
            ArrayType(
        StructType([
                StructField("id", IntegerType(), True)
            ])
        ))
    newdf = newdf.withColumn("categories", udf_categories_modification("categories"))
        
    newcustomerdyc = DynamicFrame.fromDF(newdf, glueContext, "modifieddata")
    return (DynamicFrameCollection({"CustomTransform0": newcustomerdyc}, glueContext))

请注意,类别是数据框列之一。此外,未嵌套的字段也会插入到 bigquery 中。

以下是错误:

Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table ml-training-231514:data_for_seo_test.au_2021_11. Field categories.id is missing in new schema

标签: python-3.xgoogle-bigquery

解决方案


推荐阅读