首页 > 解决方案 > 通过胶水作业将镶木地板写入 s3 时作业中止

问题描述

我的代码如下所示,其中包含转换:

dictionaryDf = spark.read.option("header", "true").csv(
            "s3://...../.csv")

        web_notif_data = fullLoad.cache()
        web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)
        print("::::::data has been loaded::::::::::::")
        distinct_campaign_name = web_notif_data.select(
            trim(web_notif_data.campaign_name).alias("campaign_name")).distinct()
        web_notif_data.createOrReplaceTempView("temp")
        variablesList = Config.get('web', 'variablesListWeb')
        web_notif_data = spark.sql(variablesList)
        web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)
        web_notif_data = web_notif_data.withColumn("camp", regexp_replace("campaign_name", "_", ""))
        web_notif_data = web_notif_data.drop("campaign_name")
        web_notif_data = web_notif_data.withColumnRenamed("camp", "campaign_name")
        web_notif_data = web_notif_data.withColumn("channel", lit("web_notification"))
        web_notif_data.createOrReplaceTempView("data")
        campaignTeamWeb = Config.get('web', 'campaignTeamWeb')
        web_notif_data = spark.sql(campaignTeamWeb)
        web_notif_data.persist(StorageLevel.MEMORY_AND_DISK)

        distinct_campaign_name = distinct_campaign_name.withColumn("camp", F.regexp_replace(
            F.lower(F.trim(col("campaign_name"))),
            "[^a-zA-Z0-9]", ""))
        output_df3 = (
            distinct_campaign_name.withColumn("cname_split",
                                              F.explode(F.split(F.lower(F.trim(col("campaign_name"))), "_")))
                .join(
                dictionaryDf,
                (
                        (
                                (F.col("function") == "contains") &
                                F.col("camp").contains(F.col("terms"))
                        ) |
                        (
                                (F.col("function") == "match") &
                                F.col("campaign_name").contains("_") &
                                (F.col("cname_split") == F.col("terms"))
                        )
                ),
                "left"
            )
                .withColumn(
                "empty_is_other",
                F.when(
                    (
                            F.col("product").isNull() &
                            F.col("product_category").isNull()
                    ),
                    "other"
                )
            )
                .withColumn(
                "rn",
                F.row_number().over(
                    Window.partitionBy("campaign_name")
                        .orderBy(
                        F.when(
                            F.col("function").isNull(), 3
                        ).when(
                            F.col("function") == "match", 2
                        ).otherwise(1),
                        F.length(F.col("terms")).desc(),
                        F.col("product").isNull()
                    )
                )
            )
                .filter("rn=1")
                .select(
                "campaign_name",
                F.coalesce("product", "empty_is_other").alias("prod"),
                F.coalesce("product_category", "empty_is_other").alias("prod_cat"),
            )
                .na.fill("")
        )
        print(":::::::::::transformations have been done finally::::::::::::")
        web_notif_data1 = web_notif_data  # Just taking the backup of DF in case something goes wrong
        web_notif_data = web_notif_data.drop("campaign_name")
        web_notif_data = web_notif_data.withColumnRenamed("temp_campaign_name", "campaign_name")
        veryFinalDF = web_notif_data.join(output_df3, "campaign_name", "left_outer")
        # veryFinalDF.show(truncate=False)
        veryFinalDF.write.mode("overwrite").parquet(aggregatedPath)
        print("::::final data have been written successfully::::::")

fullLoad从 Redshift 表中读取的数据框在哪里。此代码在0.2 Million记录上运行良好。但是,在 15 天的生产中,数据可能会达到大约 1minimum25 Million记录。我不知道大小,因为数据存储在红移表中,我们正在从中读取然后处理数据。我正在通过Glue作业运行此代码,它卡在最后一行,即在将数据写入镶木地板时。它给了我以下错误:

在此处输入图像描述

我尝试与30执行者一起运行它。20 mins将数据加载到数据Redshift框中需要花费一些时间fullLoad。还有什么办法可以避免这个错误?我是 AWS 和 Glue 工作的新手。

标签: amazon-web-servicesapache-sparkpysparkaws-glueaws-glue-spark

解决方案


推荐阅读