首页 > 解决方案 > AWS Glue 上的 Spark SQL:pyspark.sql.utils.AnalysisException

问题描述

我在 AWS Glue 脚本中使用 Spark SQL 来转换 S3 中的一些数据。这是脚本逻辑

数据格式 CSV

编程语言:Python

1) 使用 Glue 的 Catalog 将 S3 中的数据拉入 Glue 的 DynamicDataFrame

2) 使用 toDF() 从 Glue 的数据帧中提取 Spark 数据帧

3) 制作 Spark 数据框 Spark SQL 表

createOrReplaceTempView() 

4)使用 SQL 查询进行转换(这是我遇到问题的地方)

5) 将最终数据框转换为 Glue 动态数据框

6) 使用将最终数据帧存储到 S3 中 glueContext.write_dynamic_frame.from_options()

问题

当我在 SQL 中使用比较时,例如 WHERE > 或 (case when <some_columns> > <some int> then 1 else 0 end) as <some_newcol>

我收到以下错误

pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` >
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` >
100000)' (struct<int:int,string:string> and int).; line 1 pos 35;\n'Project
['demand_amt]\n+- 'Filter (cxvalue#4 > 100000)\n +- SubqueryAlias sales\n +-
LogicalRDD [sales_id#0, customer_name#1, customer_loc#2, demand_amt#3L,
cxvalue#4]\n"
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` =
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` =
100000)' (struct<int:int,string:string> and int).; line 1 pos 33;\n'Project
[customer_name#1, CASE WHEN (cxvalue#4 = 100000) THEN demand_amt#3 ELSE 0 END AS
small#12, CASE WHEN cxvalue#4 IN (200000,300000,400000) THEN demand_amt#3 ELSE 0
END AS medium#13]\n+- SubqueryAlias sales\n +- LogicalRDD [sales_id#0,
customer_name#1, customer_loc#2, demand_amt#3, cxvalue#4]\n"

这告诉我它正在考虑将列作为数字和字符串,这是特定于 Spark 而不是 AWS。SUM() GROUP BY 只有比较才能正常工作

我尝试了以下步骤

1) 尝试使用 Spark 方法更改列类型 - 失败

df=df.withColumn(<column> df[<columns>].cast(DoubleType())) # df is Spark Data
111

Glue 不允许更改 spark 数据框列类型的数据类型

2) 使用 Glue 的 resoveChoice 方法,如https://github.com/aws-samples/aws-gluesamples/blob/master/examples/resolve_choice.md中所述 。resolveChoice 方法有效 - 但 sql 失败并出现相同的错误

3) 用于cast(<columns> as <data_type>)SQL 查询 – 失败

4)在我的谷歌云上启动 Spark 集群(只是为了确保与 AWS 无关)。仅使用具有相同上述逻辑的 Spark – 失败并出现相同错误

5) 在相同的 Spark 集群和相同的数据集上使用相同的逻辑,但使用StructTypeStructField 在创建新的 Spark 数据帧时强制执行模式 – 通过

这是示例数据

+--------+-------------+------------+----------+-------+
|sales_id|customer_name|customer_loc|demand_amt|cxvalue|
+--------+-------------+------------+----------+-------+
|       1|          ABC|   Denver CO|      1200| 300000|
|       2|          BCD|   Boston MA|       212| 120000|
|       3|          CDE|  Phoenix AZ|       332| 100000|
|       4|          BCD|   Boston MA|       211| 120000|
|       5|          DEF| Portland OR|      2121|1000000|
|       6|          CDE|  Phoenix AZ|        32| 100000|
|       7|          ABC|   Denver CO|      3227| 300000|
|       8|          DEF| Portland OR|      2121|1000000|
|       9|          BCD|   Boston MA|        21| 120000|
|      10|          ABC|   Denver CO|      1200|300000 |
+--------+-------------+------------+----------+-------+

这些是失败的示例代码和查询

sdf_sales.createOrReplaceTempView("sales")

tbl1="sales"

sql2="""select customer_name, (case when cxvalue  < 100000 then 1 else 0)  as small,   
(case when cxvalue  in (200000, 300000, 400000 ) then demand_amt else 0 end) as medium  
from {0}   
""".format(tbl1)

sql4="select demand_amt from {0} where cxvalue>100000".format(tbl1)

但是,这些查询对于成功的 Glue 作业非常有用

sql3="""select customer_name, sum(demand_amt) as total_spent from {0} GROUP BY  customer_name""".format(tbl1)

挑战:希望胶水以某种方式允许我更改 Spark Dataframe 架构。任何建议将不胜感激。

标签: apache-sparkpysparkpyspark-sqlaws-glue

解决方案


AWS GlueresolveChoice修复了该问题。编程逻辑错误:将 Spark Frame 视为可变的


推荐阅读