首页 > 解决方案 > 如何为原始区域内的所有表执行 Glue ETL 作业(从我的原始区域转换为镶木地板到已处理)?

问题描述

我目前正在自动化我的数据湖摄取过程。我有数据进入我的原始区域(S3 存储桶)。在存储桶中,我有 27 个文件夹,每个文件夹对应一个数据库 - 每个文件夹有 x 个 csv 文件,每个对应一个表。我有一个 S3 事件(所有对象创建事件)触发了一个 lambda 函数,该函数抓取了我的原始区域。我能够成功地看到每张桌子。完成后,我想创建一个 ETL 作业,将处理区域中的数据移动到镶木地板,但是考虑到我拥有的表数量,我不想手动创建一个作业,将每个表指定为“源” ”。

我通过将单个 csv 文件上传到我的原始区域来演示我的自动化服务,然后爬虫运行,然后 ETL 作业也运行将“s3 原始区域表”转换为镶木地板并将其放入我的处理区域。当我删除我的第二个表时,爬虫能够成功地将其识别为我的原始区域中的新表,但在我的处理区域中,它将数据合并到第一个模式(即使它们完全不同)。

我期望以下内容:1)爬虫将 csv 识别为表 2)胶水 etl 将文件转换为镶木地板 3)爬虫将镶木地板文件识别为单个表

以下代码突出显示了我面临的问题 - 指定的数据源是一个表(文件夹),并且假定该文件夹中的所有内容都具有相同的架构。

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

标签: amazon-web-servicesamazon-s3aws-lambdaaws-glue

解决方案


使用以下函数创建了一个 ETL 作业,以循环遍历我的数据库中的表,并将 parquet 文件写入具有相同名称的新文件夹(这样我就可以爬取表并使用 athena 进行查询)。

databaseName = 'DATABASE'
Tables = client.get_tables( DatabaseName = databaseName )
tableList = Tables ['TableList']
for table in tableList:
    tableName = table['Name']
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "DATABASE", table_name = tableName, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://processed-45ah4xoyqr1b/Application1/"+tableName+"/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

推荐阅读