python - 如何在 psycopg2 标识符中使用 redshift 复制命令
问题描述
我将执行COPY
命令的python lambda代码设置为redshift。
bucket_name = get_bucket_name(event)
file_name = get_file_name(event)
table = bucket_name.replace('-','_')
url = f's3://{bucket_name}/{file_name}'
query = sql.SQL(''' truncate test_Schema.{};
commit;
copy test_Schema.{}
from {}
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV;''').format(*map(sql.Identifier,(table,table,url)))
print(query)
cur = con.cursor()
cur.execute(query, (url,))
con.commit()
通过使用标识符,我可以使用变量并设置 S3 存储桶目标。
但它返回以下错误。似乎双引号导致错误..
truncate test_Schema."test";
commit;
copy jp_icqa_ddl."operation_defect_detail"
from "s3://test_Schema/test_20210816.csv"
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV;
[ERROR] SyntaxError: syntax error at or near ""s3://test_Schema/test_20210816.csv""
LINE 4: from "s3://test_Schema/test_20210816...
^
Traceback (most recent call last):
File "/var/task/s3-to-redshift.py", line 63, in lambda_handler
cur.execute(query, (url,))
有没有办法避免这种情况?
如何处理标识符中的双引号?
谢谢
解决方案
虽然这不是您问题的确切答案,但这是一个更好的解决方案,不需要数据库连接字符串。
我建议在 lambda 中使用 Redshift Data API 将数据从 S3 加载到 Redshift。您可以摆脱psycopgs2
包并使用 lambda 中的内置boto3
包。
这将异步运行复制查询,并且 lambda 函数不会花费超过几秒钟的时间来运行它。
我使用sentry_sdk从 lambda 获取运行时错误通知。
import boto3
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration
sentry_sdk.init(
"https://aaaaaa@aaaa.ingest.sentry.io/aaaaaa",
integrations=[AwsLambdaIntegration(timeout_warning=True)],
traces_sample_rate=0
)
def execute_redshift_query(sql):
data_client = boto3.client('redshift-data')
data_client.execute_statement(
ClusterIdentifier='redshift-cluster-test',
Database='db',
DbUser='db_user',
Sql=sql,
StatementName='Test query',
WithEvent=True,
)
def handler(event, context):
query = """
copy schema.test_table
from 's3://test-bucket/test.csv'
IAM_ROLE 'arn:aws:iam::1234567890:role/TestRole'
region 'us-east-1'
ignoreheader 1 csv delimiter ','
"""
execute_redshift_query(query)
return True
如果复制查询失败,另一个 lambda 函数会发送错误通知。您可以使用下面屏幕截图中的规则添加 EventBridge lambda 触发器。
这是发送错误通知的 lambda 代码。
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration
sentry_sdk.init(
"https://aaaa@aaa.ingest.sentry.io/aaaaa",
integrations=[AwsLambdaIntegration(timeout_warning=True)],
traces_sample_rate=0
)
def lambda_handler(event, context):
try:
if event["detail"]["state"] != "FINISHED":
raise ValueError(str(event))
except Exception as e:
sentry_sdk.capture_exception(e)
return True
StatementName
您可以使用第一个 lambda 函数中的定义来确定哪个复制查询失败。
希望它是有帮助的。
推荐阅读
- python - 将 A+"COMBINING ACUTE ACCENT" 转换为 Á
- java - 从 Java 读取文件 excel 版本 4
- selenium - 在 Selenium 上进行测试时,无法单击 iframe 内的单选按钮
- qt - 如何在 PyQt5 中保留隐藏小部件的快捷方式?
- javascript - Html 表 TD 和 TH 与 jquery 数据表不匹配
- linq - 使用 GetEntitiesAync (DocumentDB) 的方法的单元测试
- python - 无法在 numpy 数组中插入三个连续的零
- wordpress - WordPress 主题自定义:从蓝光主题中删除顶部标题
- python - 使用 sklearn xgboost gridsearchcv 的多个评分指标
- django - 为什么 Django ORM 在简单的外键关系上失败?