首页 > 解决方案 > 打开的文件太多 AWS Glue 作业

问题描述

当有很多文件要处理时,我有以下作业脚本会引发错误。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'ENVIRONMENT', 'WORK_BUCKET_NAME', 'OUTPUT_BUCKET_NAME'])

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
env = args['ENVIRONMENT']
work_bucket_name = args['WORK_BUCKET_NAME']
output_bucket_name = args['OUTPUT_BUCKET_NAME']

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = f"{env}_raw_edocs", table_name = "esocial_s_2200", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("esocial", "string", "esocial", "string"), ("tenant", "string", "tenant", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

unbox4 = Unbox.apply(frame = dropnullfields3, path = "esocial", format = "json")

relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", name = "root", transformation_ctx = "relationalize5")

if len(relationalize5.select('root').toDF().schema) > 0:
    datasink8 = glueContext.write_dynamic_frame.from_options(frame = relationalize5.select('root'), connection_type = "s3", connection_options = {"path": f"s3://{output_bucket_name}/{env}/anonymous/edocs/esocial_s-2200", "partitionKeys": ["tenant", "year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink8")
    job.commit()

堆栈错误是:

文件“/tmp/raw_edocs_s_2200.py”,第 55 行,relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", 名称= "root", transformation_ctx = "relationalize5") 文件 "/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/transform.py",第 24 行,在应用中返回 transform(*args, ** kwargs)文件“/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/relationalize.py”,第 47 行,调用中 返回 frame.relationalize(名称,staging_path,options,transformation_ctx,info,stageThreshold,totalThreshold)文件“/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py”,第 344 行,关系化长( stageThreshold), long(totalThreshold))) 文件“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”,第 1257 行,调用中 answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a , **kw) 文件“/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py”,第 328 行,get_return_value 格式(target_id,“.”,名称) , value) py4j.protocol.Py4JJavaError: 调用 o97.relationalize 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 3190 失败 4 次,最近一次失败:阶段 1.0 中丢失任务 3190.3(TID 9056、172.36.129.80、执行程序 1):java.io .FileNotFoundException: /tmp/blockmgr-5e470d53-7285-469b-8eb2-5e1c9b43e02c/1e/rdd_1039_3190(打开的文件太多)

我的工作配置如下:

  GlueS2200RawJob:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: "glueetl"
        PythonVersion: 3
        ScriptLocation: !Sub s3://${WorkBucketName}/${Environment}/anonymous/glue_jobs/raw_edocs_s_2200.py
      DefaultArguments:
        "--job-bookmark-option": "job-bookmark-enable"
        "--ENVIRONMENT": !Ref Environment
        "--WORK_BUCKET_NAME": !Ref WorkBucketName
        "--OUTPUT_BUCKET_NAME": !Ref OutputBucketName
      GlueVersion: "2.0"
      Name: !Sub ${Environment}_raw_edocs_s_2200
      NumberOfWorkers: 2
      Role: !Ref GlueS22
      Tags:
        env: !Ref Environment

      Tags:
        env: !Ref Environment

有谁知道可以帮助解决这个问题的任何事情?

标签: amazon-web-servicesaws-glue

解决方案


为了解决这个问题,我需要合并数据帧并减少分区数量。

cores = int(sc.getConf().get('spark.executor.cores'))
instances = int(sc.getConf().get('spark.executor.instances'))
max_partitions = 200
coalesced_df = unbox4.toDF().coalesce(max(cores * instances, max_partitions))
coalesced5 = DynamicFrame.fromDF(coalesced_df, glue_context, 'coalesced5')

推荐阅读