首页 > 解决方案 > AWS Glue / Spark - AnalysisException 无法在过滤器后解析列名 -> 左连接

问题描述

我正在尝试使用 pySpark 对 AWS Glue 2.0 中的一些 CSV 文件进行过滤和左连接操作。有时,如果过滤器过滤掉所有数据或者输入 csv 为空,我的工作会崩溃:

AnalysisException: 'Cannot resolve column name "col_a" among ();'

-我已经看到其他人在其他一些问题上发生了这个异常,但我认为我的问题是在删除行时丢失标题信息 -> 这是一个 DynamicFrame 功能(在 aws 胶水中找不到任何关于它的信息文档)?

- 我意识到我可以在所有连接之后执行过滤器,但我想避免这种情况,因为它看起来可能更昂贵,并且因为理想情况下,如果输入数据是空的 CSV,我希望这项工作也不会崩溃。

- 任何建议都非常感谢:)

这是 pySpark 代码的模拟(请注意,在真实的东西中,我想将许多连接、转换和过滤器链接在一起):

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "table_1", transformation_ctx = "DataSource0")
DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "table_2", transformation_ctx = "DataSource1")

Table_1_Renames = ApplyMapping.apply(frame = DataSource0, mappings = [("col_a", "string", "col_a", "string"), ("col_b", "string", "col_xyz", "string")], transformation_ctx = "Transform0")
Table_2_Renames = ApplyMapping.apply(frame = DataSource1, mappings = [("col0", "string", "col0_renamed", "string"), ("col1", "string", "col1_renamed", "string")], transformation_ctx = "Transform1")

Table_1_Filter = Filter.apply(frame = Table_1_Renames, f = lambda row : (bool(re.match("KeepValue", row["col_b"]))), transformation_ctx = "Table_1_Filter")

Table_1_Filter_DF = Table_1_Filter.toDF()
Table_2_Renames_DF = Table_2_Renames.toDF()

#If the original data was empty, or the filter removes all rows of the data, we get:
#AnalysisException: 'Cannot resolve column name "col_a" among ();'
LeftJoin_1 = DynamicFrame.fromDF(Table_1_Filter_DF.join(Table_2_Renames_DF, (Table_1_Filter_DF['col_a'] == Table_2_Renames_DF['col0_renamed']), "left"), glueContext, "LeftJoin_1") ```

标签: apache-sparkaws-glue

解决方案


推荐阅读