首页 > 解决方案 > 将数据从胶水加载到雪花

问题描述

我正在尝试在胶水上运行 ETL 作业,在该作业中,我将数据从 mongodb 提取到 spark 数据帧中,然后将其加载到雪花中。

这是 Spark 数据框的示例架构

|-- login: struct (nullable = true)
 |    |-- login_attempts: integer (nullable = true)
 |    |-- last_attempt: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- notifications: struct (nullable = true)
 |    |-- bot_review_queue: boolean (nullable = true)
 |    |-- bot_review_queue_web_push: boolean (nullable = true)
 |    |-- bot_review_queue_web_push_admin: boolean (nullable = true)
 |    |-- weekly_account_summary: struct (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |-- weekly_summary: struct (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |    |-- day: integer (nullable = true)
 |    |    |-- hour: integer (nullable = true)
 |    |    |-- minute: integer (nullable = true)
 |-- query: struct (nullable = true)
 |    |-- email_address: string (nullable = true)

我正在尝试将数据按原样加载到雪花中,并将结构列作为雪花中的 json 有效负载,但它会引发以下错误

An error occurred while calling o81.collectToPython.com.mongodb.spark.exceptions.MongoTypeConversionException:Cannot cast ARRAY into a StructType

我还尝试将结构列转换为字符串并加载它,但它或多或少会引发相同的错误

An error occurred while calling o106.save.  com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType

如果我能得到一些帮助,我真的很感激。

下面的代码用于铸造和加载。

dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                  connection_options=read_mongo_options)
user_df_cast = user_df.select(user_df.login.cast(StringType()),'name',user_df.notifications.cast(StringType()))
datasinkusers = user_df_cast.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "users").mode("append").save()

标签: mongodbpysparkapache-spark-sqlsnowflake-cloud-data-platformaws-glue

解决方案


如果您users在 Snowflake 中的表具有以下架构,则不需要强制转换,因为StructTypeSparkSQL DataFrame 的字段将自动映射到VARIANTSnowflake 中的类型

CREATE TABLE users (
    login VARIANT
   ,name STRING
   ,notifications VARIANT
   ,query VARIANT
)

只需执行以下操作,无需转换,因为 Snowflake Spark 连接器了解数据类型并将自行转换为适当的 JSON 表示:

user_df = glueContext.create_dynamic_frame.from_options(
  connection_type="mongodb",
  connection_options=read_mongo_options
)

user_df
  .toDF()
  .write
  .format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("dbtable", "users")
  .mode("append")
  .save()

如果您绝对需要将StructType字段存储为纯 JSON 字符串,则需要使用to_jsonSparkSQL 函数显式转换它们:

from pyspark.sql.functions import to_json

user_df_cast = user_df.select(
  to_json(user_df.login),
  user_df.name,
  to_json(user_df.notifications)
)

这会将 JSON 字符串存储为简单VARCHAR类型,这样您就无法直接利用 Snowflake 的半结构化数据存储和查询功能PARSE_JSON(效率低下)。

考虑使用VARIANT上面显示的方法,这将允许您直接对字段执行查询:

SELECT
    login:login_attempts
   ,login:last_attempt
   ,name
   ,notifications:weekly_summary.enabled
FROM users

推荐阅读