amazon-web-services - 通过胶水作业将镶木地板写入 s3 时作业中止
问题描述
我的代码如下所示,其中包含转换:
dictionaryDf = spark.read.option("header", "true").csv(
"s3://...../.csv")
web_notif_data = fullLoad.cache()
web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)
print("::::::data has been loaded::::::::::::")
distinct_campaign_name = web_notif_data.select(
trim(web_notif_data.campaign_name).alias("campaign_name")).distinct()
web_notif_data.createOrReplaceTempView("temp")
variablesList = Config.get('web', 'variablesListWeb')
web_notif_data = spark.sql(variablesList)
web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)
web_notif_data = web_notif_data.withColumn("camp", regexp_replace("campaign_name", "_", ""))
web_notif_data = web_notif_data.drop("campaign_name")
web_notif_data = web_notif_data.withColumnRenamed("camp", "campaign_name")
web_notif_data = web_notif_data.withColumn("channel", lit("web_notification"))
web_notif_data.createOrReplaceTempView("data")
campaignTeamWeb = Config.get('web', 'campaignTeamWeb')
web_notif_data = spark.sql(campaignTeamWeb)
web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)
distinct_campaign_name = distinct_campaign_name.withColumn("camp", F.regexp_replace(
F.lower(F.trim(col("campaign_name"))),
"[^a-zA-Z0-9]", ""))
output_df3 = (
distinct_campaign_name.withColumn("cname_split",
F.explode(F.split(F.lower(F.trim(col("campaign_name"))), "_")))
.join(
dictionaryDf,
(
(
(F.col("function") == "contains") &
F.col("camp").contains(F.col("terms"))
) |
(
(F.col("function") == "match") &
F.col("campaign_name").contains("_") &
(F.col("cname_split") == F.col("terms"))
)
),
"left"
)
.withColumn(
"empty_is_other",
F.when(
(
F.col("product").isNull() &
F.col("product_category").isNull()
),
"other"
)
)
.withColumn(
"rn",
F.row_number().over(
Window.partitionBy("campaign_name")
.orderBy(
F.when(
F.col("function").isNull(), 3
).when(
F.col("function") == "match", 2
).otherwise(1),
F.length(F.col("terms")).desc(),
F.col("product").isNull()
)
)
)
.filter("rn=1")
.select(
"campaign_name",
F.coalesce("product", "empty_is_other").alias("prod"),
F.coalesce("product_category", "empty_is_other").alias("prod_cat"),
)
.na.fill("")
)
print(":::::::::::transformations have been done finally::::::::::::")
web_notif_data1 = web_notif_data # Just taking the backup of DF in case something goes wrong
web_notif_data = web_notif_data.drop("campaign_name")
web_notif_data = web_notif_data.withColumnRenamed("temp_campaign_name", "campaign_name")
veryFinalDF = web_notif_data.join(output_df3, "campaign_name", "left_outer")
# veryFinalDF.show(truncate=False)
veryFinalDF.write.mode("overwrite").parquet(aggregatedPath)
print("::::final data have been written successfully::::::")
fullLoad
从 Redshift 表中读取的数据框在哪里。此代码在0.2 Million
记录上运行良好。但是,在 15 天的生产中,数据可能会达到大约 1minimum
条25 Million
记录。我不知道大小,因为数据存储在红移表中,我们正在从中读取然后处理数据。我正在通过Glue
作业运行此代码,它卡在最后一行,即在将数据写入镶木地板时。它给了我以下错误:
我尝试与30
执行者一起运行它。20 mins
将数据加载到数据Redshift
框中需要花费一些时间fullLoad
。还有什么办法可以避免这个错误?我是 AWS 和 Glue 工作的新手。
解决方案
推荐阅读
- r - 为什么 mle 函数不能以上下限运行?
- javascript - 如何从今天开始禁用前几天
- machine-learning - 随机森林中的随机子空间方法。是否在每次拆分时考虑?
- python - 在对象没有属性的 renpy 中获取属性错误编辑:附加信息
- powerapps - 如何将 PLAID API 与 MS Power Apps 自定义连接器集成?
- perl - 如何使用正则表达式从文件名中查找键并从哈希中按键获取值?
- arduino - 如何在类创建的对象中初始化伺服引脚?
- c++ - 如何遍历邻接表???对于加权图
- flutter - 如何在 Flutter 中使用 ChangeNotifier 将变量数据放入类中?
- c# - Razor 表单仅提交一个输入字段