amazon-web-services - 根据 Glue 作业状态将文件传输到 S3 存储桶
问题描述
I am new to **AWS Glue,** and my aim is to extract transform and load files uploaded in S3 bucket to RDS instance. Also I need to transfer the files into separate S3 buckets based on the Glue Job status (Success /Failure). There will be more than one file uploaded into the initial S3 bucket. How can I get the name of the files uploaded so that i can transfer those files to appropriate buckets.
第 1 步:将文件上传到 S3 存储桶 1。第 2 步:触发 lamda 函数调用 Job1 第 3 步:job1 成功时将文件传输到 S3 存储桶 2 第 4 步:失败时传输到另一个 S3 存储桶
解决方案
让 lambda 事件触发器监听您将文件上传到 S3 的文件夹 在 lambda 中,使用 AWS Glue API 运行粘合作业(本质上是 AWS Glue 中的 python 脚本)。
在 Glue python script 中,使用适当的库,例如 pymysql 等作为与您的 python 脚本一起打包的外部库。
执行从 S3 到 RDS 表的数据加载操作。如果您使用的是 Aurora Mysql,那么 AWS 提供了一个不错的功能“从 S3 加载”,因此您可以直接将文件加载到表中(您可能需要在 PARAMETER GROUP / IAM Roles 中进行一些配置)。
调用粘合作业的 Lambda 脚本:
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="<YOUR GLUE JOB NAME>"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
raise e
胶水脚本:
import mysql.connector
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.context import DynamicFrame
from awsglue.transforms import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import SQLContext
# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())
url="<RDS URL>"
uname="<USER NAME>"
pwd="<PASSWORD>"
dbase="DBNAME"
def connect():
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
return cur, conn
def create_stg_table():
cur, conn = connect()
createStgTable1 = <CREATE STAGING TABLE IF REQUIRED>
loadQry = "LOAD DATA FROM S3 PREFIX 'S3://PATH FOR YOUR CSV' REPLACE INTO TABLE <DB.TABLENAME> FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4, @var5, @var6, @var7, @var8) SET ......;"
cur.execute(createStgTable1)
cur.execute(loadQry)
conn.commit()
conn.close()
然后,您可以创建一个 cloudwatch 警报,其中检查粘合作业状态,并根据状态在 S3 之间执行文件复制操作。我们的生产中有类似的设置。
问候
尤瓦
推荐阅读
- java - 使用java中的麦克风进行VOSK的语音识别
- java - 带有重试模板的 Apache Kafka 侦听器的 Junit
- android - 获取深度图像时如何避免在 ARCore 中裁剪深度帧?
- azure - 将 SMB 或 NFT Azure 文件共享装载到 kubernetes 上的 JupyterHub 以获取共享目录
- sql - 查询没有重复和聚合函数或 GROUP BY 子句问题。- 重复
- python - 一个烧瓶网站,当它删除一个文件(os.remove(“abc.txt”))时,文件被删除但空间没有被回收
- azure - 什么是允许公共和政府 Azure SSO 登录到我们的网站,同时在同意屏幕上将我们的应用程序标记为已验证的正确方法?
- postgresql - “选择 ;” 为什么postgresql中存在这个语句
- django - 从视图中对 localhost 的请求会导致服务器停止响应
- angular - 如何从打字稿中的对象获取键和值