amazon-web-services - 在 AWS Glue 中删除具有空值的行的问题
问题描述
当前,AWS Glue 作业读取 S3 集合并将其写入 AWS Redshift 时遇到问题,其中我们有一个包含null
值的列。
这项工作应该相当简单,并且大部分代码都是由 Glue 接口自动生成的,但是由于我们在 Redshift 中没有空列,这些列在我们的数据集中有时为空,我们无法完成这项工作。
代码的压缩版本如下所示,代码在 Python 中,环境是 PySpark。
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_1", table_name = "table_1", transformation_ctx = "datasource0")
resolvedDDF = datasource0.resolveChoice(specs = [
('price_current','cast:double'),
('price_discount','cast:double'),
])
applymapping = ApplyMapping.apply(frame = resolvedDDF, mappings = [
("id", "string", "id", "string"),
("status", "string", "status", "string"),
("price_current", "double", "price_current", "double"),
("price_discount", "double", "price_discount", "double"),
("created_at", "string", "created_at", "string"),
("updated_at", "string", "updated_at", "string"),
], transformation_ctx = "applymapping")
droppedDF = applymapping.toDF().dropna(subset=('created_at', 'price_current'))
newDynamicDF = DynamicFrame.fromDF(droppedDF, glueContext, "newframe")
dropnullfields = DropNullFields.apply(frame = newDynamicDF, transformation_ctx = "dropnullfields")
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields, catalog_connection = "RedshiftDataStaging", connection_options = {"dbtable": "dbtable_1", "database": "database_1"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")
我们对 Redshift 中的price_current
andcreated_at
表有一个非空约束,并且由于我们系统中的一些早期错误,一些记录在没有所需数据的情况下到达了 S3 存储桶。我们只想删除这些行,因为它们只占要处理的整体数据的一小部分。
尽管有dropna
代码,我们仍然从 Redshift 收到以下错误。
Error (code 1213) while loading data into Redshift: "Missing data for not-null field"
Table name: "PUBLIC".table_1
Column name: created_at
Column type: timestampt(0)
Raw field value: @NULL@
解决方案
如果您不想删除它们,可以传递默认值
df= dropnullfields.toDF()
df = df.na.fill({'price_current': 0.0, 'created_at': ' '})
dyf = DynamicFrame.fromDF(df,'glue_context_1')
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dyf, catalog_connection = "RedshiftDataStaging", connection_options = {"dbtable": "dbtable_1", "database": "database_1"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")
如果要删除,请使用以下代码代替df.na.fill
df = df.na.drop(subset=["price_current", "created_at"])
推荐阅读
- python - 为什么 Python Pandas loc 不返回单个项目?
- c# - DbContext 不包含配置的定义 - EF Core
- php - PHP:尝试在多维数组中按值查找键,但不能正常工作
- ruby-on-rails - 如何推理枚举的级别
- git - 忽略整个目录树中所有扩展名为 .aux、.log 的文件
- excel - Excel VBA中的合并和总计
- c# - 使用 ASP.Net Core DI 的策略模式
- visual-studio - 使用 Microsoft.VisualStudio.TestTools.UnitTesting;
- angular6 - 通过点击事件上的选择器将数据从父级传递给子级
- java - 初始化复杂过滤器时出错。从 Kotlin 运行 ffmpeg 时出现无效参数错误