pyspark - 如何在 AWS Glue 作业中附加带有源名称的新列?
问题描述
我已经搜索了堆栈溢出,以了解如何将源文件名作为值附加一个新列。但是,它并没有按预期进行。
在我的最终 parquet 文件中,我找到了一个名为 input_file_name 的新列,但该值为空。(喜欢 ””)
我想知道我忽略了哪一步。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "mytable", transformation_ctx = "datasource0")
datasource1 = datasource0.toDF().withColumn("input_file_name", F.input_file_name())
datasource2 = DynamicFrame.fromDF(datasource1, glueContext, "datasource2")
applymapping1 = ApplyMapping.apply(frame = datasource2, mappings = [("input_file_name", "string", "input_file_name", "string"),
("Profile", "struct", "Profile","struct")], transformation_ctx = "applymapping1")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://temp/testing"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
解决方案
您不能通过将动态帧转换为数据帧来使用 input_file_name()
您必须使用 spark.read api 将数据读取到数据帧。下一步必须立即使用 input_file_name() - 在对数据帧执行任何操作之前。
请记住,aws glue 有一个已知问题,即它可以处理高达 45gb 的数据,然后它会引发错误。增加dpus不起作用
推荐阅读
- java - Spring JPA 停止关系创建新表
- python - Django inspectdb 省略了整数主键
- python - 在 Python 上无法正常运行的操作顺序。有谁知道发生了什么?
- netlogo - 报告行为空间 Netlogo 中的列表
- php - 生成允许访客将文件上传到服务器的唯一 URL
- node.js - PRN 数据库无法从外部机器访问?
- javascript - 如何仅使用 javascript 使图像在点击时放大?
- prestashop - 如何随着时间的推移限制对产品的访问?
- node.js - 如何将 ConPTY 输出转换为 HTML
- python - Python 错误:“TypeError:'NoneType' 类型的对象没有 len()”