python - AWS Glue:过滤日期字段
问题描述
我有一个非常基本的问题,我创建了一个 AWS Glue 作业,我需要在从 dynamodb 表中提取数据时创建一个过滤器。我只需要使用名为“时间”的字段从前一天提取数据。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "tablex", transformation_ctx = "DataSource0")
Transform1 = ApplyMapping.apply(frame = DataSource0, mappings = [("allresults.paymentCapabilityResult.paymentCapabilityCheckResult", "boolean", "allresults.paymentCapabilityResult.paymentCapabilityCheckResult", "boolean"), ("time", "string", "time", "timestamp")], transformation_ctx = "Transform1")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "json", connection_options = {"path": "s3://xxxxx/output/", "partitionKeys": []}, transformation_ctx = "DataSink0")
job.commit()
解决方案
昨天的过滤器可以在 PySpark 中轻松完成,如此处所述。脚本如下所示:
from pyspark.sql import functions as F
df = Transform1.toDF()
df = df.where(F.col("time") == F.date_sub(F.current_date(), 1))
Transform2 = DynamicFrame.fromDF(df, glue_ctx=glueContext, name="df")
推荐阅读
- git - 在交互式变基中删除/删除当前提交
- swift - Swift 将任何转换为数据
- c++ - 将 range-v3 与 cgal 返回的右值一起使用
- python - 如何在情节中为直线制作动画?
- php - 如何修复错误 > 完整性约束违规:1048 列“状态”不能为空
- jquery - 如何在动态添加的行中进行计算
- swift - 如何显示 wifi 设置弹出通知。如何在应用程序中获取可用的 wifi 列表
- xml - 在 XSLT 代码中从 csv 读取空值而不移动单元格值
- android - 如何使用文件的路径获取 mp3 文件的详细信息,如 Android 中的名称或艺术家?
- c++ - 为什么 std::unordered_set 比 std::set 慢?