amazon-web-services - AWS Glue 仅写入最新分区 parquet
问题描述
我有一个胶水数据库,它有两个表,每个表都有相同的数据,只是分区不同。我正在尝试编写一个每晚运行的作业,从一个表中读取数据,然后使用更新的分区写入新数据。我可以使用以下代码做到这一点:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "Database",
table_name = "Table",
transformation_ctx = "datasource0"
)
datasource0 = datasource0.toDF()
datasource0.write.partitionBy("Key1","Key2").parquet(OutputFilePath)
但这会占用并写入整个数据帧。我只想写新分区,所以我在 AWS 网站上找到了以下代码段:
glue_context.write_dynamic_frame.from_options(
frame = projectedEvents,
connection_type = "s3",
connection_options = {"path": "$outpath", "partitionKeys": ["type"]},
format = "parquet")
但这也只是重写了整个数据帧。我怎样才能重写最新的分区?
解决方案
这可以通过 push_down_predicate 参数来完成。数据原来是按年、月、日、小时划分的,所以我只是减去一天,然后使用 push_down_predicate 如下:
timestamp = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
s1 = timestamp.split('-')
pdp = "partition_0 = " + s1[0] + " and partition_1 = " + s1[1] + " and partition_2 = " + s1[2]
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "mailfiles_standardized",
table_name = "firehoseoutput",
push_down_predicate = pdp
)
glueContext.write_dynamic_frame.from_options(
frame = datasource2,
connection_type = "s3",
connection_options = {
"path": Bucket,
"partitionKeys": ["Key1","Key2"]
},
format = "parquet")
推荐阅读
- java - 如何在读取数据之前检查Java Socket InputStream是否包含数据
- web-services - .NET Core 调用 WS 服务 - 匿名错误
- css - 空格字符宽度与自定义 CSS 字体不一致
- python - 如何在ipysheet新添加的行中添加数据?
- vba - 在 Word 中用 VBA 替换彩色文本
- powershell - Foreach-Object -Parallel 未执行完整脚本
- linux - 无法让代理链接受 HTTPS 设置,但它确实接受 HTTP
- javascript - ShaderMaterial 的问题
- react-native - 为什么抽屉不能在 android 上工作,但在 iOS 上工作?
- docker - 如何在 circleci 作业中执行“docker run”命令