mysql - AWS Glue 没有将 id(int) 列复制到 Redshift - 它是空白的
问题描述
Glue有一个非常奇怪的问题。使用它对我从 MySQL RDS 移动到 Redshift 的数据运行一些 ETL。使用我在另一个表上使用的相同代码,它工作正常并复制了所有应有的数据。
但是在第二个表上,由于某种原因,它不会从 MySQL 复制 id 列中的数据。Redshift 上的 id 列是完全空白的。
query_df = spark.read.format("jdbc").option("url",
args['RDSURL']).option("driver",
args['RDSDRIVER']).option("dbtable",
args['RDSQUERY']).option("user", args['RDSUSER']).option("password",
args['RDSPASS']).load()
datasource0 = DynamicFrame.fromDF(query_df, glueContext,
"datasource0")
logging.info(datasource0.show())
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings =
[("id", "int", "id", "int"), ... , transformation_ctx =
"applymapping1")
logging.info(applymapping1.show())
从我在上面打印的上述日志中,我可以看到即使在 ApplyMapping 之后动态框架也包含 id 字段。
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame =
applymapping1, catalog_connection = args['RSCLUSTER'],
connection_options = {"dbtable": args['RSTABLE'], "database":
args['RSDB']}, redshift_tmp_dir = args["TempDir"],
transformation_ctx = "datasink2")
我认为问题似乎在这里发生?在此作业完成后,在检查 Redshift 时,id 列完全为空。
对这种行为感到非常困惑。确切的代码在另一个表上运行良好,这两个表中 id 之间的唯一区别是该表的 id 为 int (11) unsigned 而代码工作的表的 id 为 int (10) 签名。
解决方案
在使用 Glue 从 MySQL RDS 中提取时,我就遇到了这种行为。对于任何寻求答案的人 - 原因如下:AWSGlue 具有“类型选择”的概念,其中爬取列的确切类型可以在整个 ETL 作业中保留为多种可能性,因为爬虫只爬取列数据的子集来确定可能的类型,但不能明确决定。这就是为什么转换为使用显式模式而不是爬虫将解决问题,因为它不涉及任何类型选择。
当作业运行(或查看预览)时,Spark 将尝试处理整个列数据集。此时,列类型可能被解析为与数据集不兼容的类型 - 即解释器无法决定正确的类型选择,这会导致相关列的数据为空。我在从 MySQL DB 转换多个表时经历过这种情况,并且没有明显的模式说明为什么有些失败而有些失败我已经能够确定,尽管它必须与源 DB 列中的数据相关.
解决方案是通过将失败的列转换为所需的目标类型,将选择的明确解决方案添加到您的脚本中,如下所示:
df.resolveChoice(specs = [('id', 'cast:int')])
其中 df 是数据框。这将强制将该列解释为预期类型,并应导致该列中数据的预期输出。这一直对我有用。
请注意,对于那些使用 Glue Studio 可视化编辑器的用户,现在可以添加一个“自定义转换”步骤,其中包含为您执行此操作的代码。在这种情况下,转换代码应如下所示:
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0])
df_resolved = df.resolveChoice(specs = [('id', 'cast:int')])
return (DynamicFrameCollection({"CustomTransform0": df_resolved}, glueContext))
另请注意,在这种情况下,有必要在此自定义转换节点之后使用“从集合中选择”转换,因为自定义转换返回一个集合而不是单个帧。
推荐阅读
- python - 使用python从模块调用主文件中的内部函数
- python - 如何比较两个数据框并使用新值进行更新
- flutter - ExpansionTile 溢出
- ffmpeg - ffmpeg - 在组合多个图像和音轨时对输入图像应用填充
- oracle-apex - 如何在 Oracle Apex 交互式报表的行中添加更新按钮?
- java - 将 UTC 日期转换为给定时区时的值错误
- c# - ASP.net Core - JsonProperty NullValueHandling.Ignore 不适用于可空类型
- java - Java Applet 有危险吗?
- python - 按顺序绘制带有 y 轴的图形
- python - 迭代唯一的列值以创建数据帧,在子数据帧上运行函数并保存为单个 concat