首页 > 解决方案 > 在 AWS Glue 作业中添加时间戳列

问题描述

我对此还是很陌生。我正在尝试为 AWS Glue 中的时间戳添加一个新列。我收到一个错误,“timestamped4”未定义,但在我使用 DynamicFrame.fromDF 时没有定义吗?任何帮助将不胜感激。先感谢您。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext, DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import current_timestamp


## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "unimintest", table_name = "adsdsilo_xml", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("capacity", "int", "capacity", "int"), ("product", "string", "product", "string"), ("quantity", "int", "quantity", "int"), ("uom", "string", "uom", "string"), ("_id", "string", "_id", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

timestampedDf = dropnullfields3.toDF().withColumn("TimeStamp", current_timestamp())
timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, timestamped4)

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = timestamped4, catalog_connection = "con1", connection_options = {"dbtable": "silo_xml", "database": "MyRDS"}, transformation_ctx = "datasink4")
job.commit()

标签: aws-glue

解决方案


从 pyspark.sql.functions 导入点亮导入日期时间

timestampedDf = dropnullfields4.withColumn('load_date', lit(datetime.datetime.now()))


推荐阅读