amazon-web-services - 如何为原始区域内的所有表执行 Glue ETL 作业(从我的原始区域转换为镶木地板到已处理)?
问题描述
我目前正在自动化我的数据湖摄取过程。我有数据进入我的原始区域(S3 存储桶)。在存储桶中,我有 27 个文件夹,每个文件夹对应一个数据库 - 每个文件夹有 x 个 csv 文件,每个对应一个表。我有一个 S3 事件(所有对象创建事件)触发了一个 lambda 函数,该函数抓取了我的原始区域。我能够成功地看到每张桌子。完成后,我想创建一个 ETL 作业,将处理区域中的数据移动到镶木地板,但是考虑到我拥有的表数量,我不想手动创建一个作业,将每个表指定为“源” ”。
我通过将单个 csv 文件上传到我的原始区域来演示我的自动化服务,然后爬虫运行,然后 ETL 作业也运行将“s3 原始区域表”转换为镶木地板并将其放入我的处理区域。当我删除我的第二个表时,爬虫能够成功地将其识别为我的原始区域中的新表,但在我的处理区域中,它将数据合并到第一个模式(即使它们完全不同)。
我期望以下内容:1)爬虫将 csv 识别为表 2)胶水 etl 将文件转换为镶木地板 3)爬虫将镶木地板文件识别为单个表
以下代码突出显示了我面临的问题 - 指定的数据源是一个表(文件夹),并且假定该文件夹中的所有内容都具有相同的架构。
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
解决方案
使用以下函数创建了一个 ETL 作业,以循环遍历我的数据库中的表,并将 parquet 文件写入具有相同名称的新文件夹(这样我就可以爬取表并使用 athena 进行查询)。
databaseName = 'DATABASE'
Tables = client.get_tables( DatabaseName = databaseName )
tableList = Tables ['TableList']
for table in tableList:
tableName = table['Name']
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "DATABASE", table_name = tableName, transformation_ctx = "datasource0")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://processed-45ah4xoyqr1b/Application1/"+tableName+"/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
推荐阅读
- java - ScrollActivity 中的 Android BroadcastReceiver
- firebase - 仅内部 HTTP 云功能无法访问
- python - 熊猫创建包含所有行值的列
- python - ValueError:Python 输入与 input_signature 不兼容:
- java - java-maven-idea:在 jar 中包含外部库
- android - Android - nav_header_menu 中的 setOnClickListern
- angular - 调用角度服务中的函数以返回新的 Promise
- mongodb - mongo db 输出未显示预期结果
- amazon-s3 - 来自 S3 的 Apache Flink 有状态读取文件
- java - (Sublime Text 3) bash: javac: command not found