apache-spark - AWS Glue 上的 Spark SQL:pyspark.sql.utils.AnalysisException
问题描述
我在 AWS Glue 脚本中使用 Spark SQL 来转换 S3 中的一些数据。这是脚本逻辑
数据格式 CSV
编程语言:Python
1) 使用 Glue 的 Catalog 将 S3 中的数据拉入 Glue 的 DynamicDataFrame
2) 使用 toDF() 从 Glue 的数据帧中提取 Spark 数据帧
3) 制作 Spark 数据框 Spark SQL 表
createOrReplaceTempView()
4)使用 SQL 查询进行转换(这是我遇到问题的地方)
5) 将最终数据框转换为 Glue 动态数据框
6) 使用将最终数据帧存储到 S3 中
glueContext.write_dynamic_frame.from_options()
问题
当我在 SQL 中使用比较时,例如 WHERE > 或
(case when <some_columns> > <some int> then 1 else 0 end) as <some_newcol>
我收到以下错误
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` >
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` >
100000)' (struct<int:int,string:string> and int).; line 1 pos 35;\n'Project
['demand_amt]\n+- 'Filter (cxvalue#4 > 100000)\n +- SubqueryAlias sales\n +-
LogicalRDD [sales_id#0, customer_name#1, customer_loc#2, demand_amt#3L,
cxvalue#4]\n"
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` =
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` =
100000)' (struct<int:int,string:string> and int).; line 1 pos 33;\n'Project
[customer_name#1, CASE WHEN (cxvalue#4 = 100000) THEN demand_amt#3 ELSE 0 END AS
small#12, CASE WHEN cxvalue#4 IN (200000,300000,400000) THEN demand_amt#3 ELSE 0
END AS medium#13]\n+- SubqueryAlias sales\n +- LogicalRDD [sales_id#0,
customer_name#1, customer_loc#2, demand_amt#3, cxvalue#4]\n"
这告诉我它正在考虑将列作为数字和字符串,这是特定于 Spark 而不是 AWS。SUM() GROUP BY 只有比较才能正常工作
我尝试了以下步骤
1) 尝试使用 Spark 方法更改列类型 - 失败
df=df.withColumn(<column> df[<columns>].cast(DoubleType())) # df is Spark Data
111
Glue 不允许更改 spark 数据框列类型的数据类型
2) 使用 Glue 的 resoveChoice 方法,如https://github.com/aws-samples/aws-gluesamples/blob/master/examples/resolve_choice.md中所述 。resolveChoice 方法有效 - 但 sql 失败并出现相同的错误
3) 用于cast(<columns> as <data_type>)
SQL 查询 – 失败
4)在我的谷歌云上启动 Spark 集群(只是为了确保与 AWS 无关)。仅使用具有相同上述逻辑的 Spark – 失败并出现相同错误
5) 在相同的 Spark 集群和相同的数据集上使用相同的逻辑,但使用StructType
并StructField
在创建新的 Spark 数据帧时强制执行模式 – 通过
这是示例数据
+--------+-------------+------------+----------+-------+
|sales_id|customer_name|customer_loc|demand_amt|cxvalue|
+--------+-------------+------------+----------+-------+
| 1| ABC| Denver CO| 1200| 300000|
| 2| BCD| Boston MA| 212| 120000|
| 3| CDE| Phoenix AZ| 332| 100000|
| 4| BCD| Boston MA| 211| 120000|
| 5| DEF| Portland OR| 2121|1000000|
| 6| CDE| Phoenix AZ| 32| 100000|
| 7| ABC| Denver CO| 3227| 300000|
| 8| DEF| Portland OR| 2121|1000000|
| 9| BCD| Boston MA| 21| 120000|
| 10| ABC| Denver CO| 1200|300000 |
+--------+-------------+------------+----------+-------+
这些是失败的示例代码和查询
sdf_sales.createOrReplaceTempView("sales")
tbl1="sales"
sql2="""select customer_name, (case when cxvalue < 100000 then 1 else 0) as small,
(case when cxvalue in (200000, 300000, 400000 ) then demand_amt else 0 end) as medium
from {0}
""".format(tbl1)
sql4="select demand_amt from {0} where cxvalue>100000".format(tbl1)
但是,这些查询对于成功的 Glue 作业非常有用
sql3="""select customer_name, sum(demand_amt) as total_spent from {0} GROUP BY customer_name""".format(tbl1)
挑战:希望胶水以某种方式允许我更改 Spark Dataframe 架构。任何建议将不胜感激。
解决方案
AWS GlueresolveChoice
修复了该问题。编程逻辑错误:将 Spark Frame 视为可变的
推荐阅读
- postgresql - Codeigniter:如何在 for 循环中增加活动记录?
- android - 我如何才能收到 2 项活动的意向?
- kubernetes - 如何在 Kubernetes 中查看 API 访问日志?
- mysql - 如何优化 SQL 查询?
- c# - 触发 ComboBox SelectedValue 不起作用
- flutter - 'package:flutter/src/painting/_network_image_io.dart':断言失败:第 22 行 pos 14:'url != null':不正确
- sql-server - 如果在 SQL Server 中使用多个插入语句,则为 else
- java - 从 CLI 运行自定义 mojo 时,Maven 无法识别参数
- scikit-learn - ValueError:至少需要一个数组或数据类型 HistGradientBoostingRegressor
- mysql - EAV、多左连接查询优化