首页 > 解决方案 > 如何在 psycopg2 标识符中使用 redshift 复制命令

问题描述

我将执行COPY命令的python lambda代码设置为redshift。

bucket_name = get_bucket_name(event)
file_name = get_file_name(event)

table = bucket_name.replace('-','_')
url = f's3://{bucket_name}/{file_name}'
    query = sql.SQL(''' truncate test_Schema.{};
            commit;
            copy test_Schema.{}
            from {}
            iam_role 'arn:aws:iam::123344:role/test'
            timeformat as 'auto'
            ACCEPTINVCHARS
            CSV;''').format(*map(sql.Identifier,(table,table,url)))
    
    print(query)
    cur = con.cursor()
    cur.execute(query, (url,))
    con.commit()
    

通过使用标识符,我可以使用变量并设置 S3 存储桶目标。

但它返回以下错误。似乎双引号导致错误..

truncate test_Schema."test";
commit;
copy jp_icqa_ddl."operation_defect_detail"
from "s3://test_Schema/test_20210816.csv"
iam_role 'arn:aws:iam::123344:role/test'
timeformat as 'auto'
ACCEPTINVCHARS
CSV; 

[ERROR] SyntaxError: syntax error at or near ""s3://test_Schema/test_20210816.csv""
LINE 4:             from "s3://test_Schema/test_20210816...
                         ^

Traceback (most recent call last):
  File "/var/task/s3-to-redshift.py", line 63, in lambda_handler
    cur.execute(query, (url,))

有没有办法避免这种情况?

如何处理标识符中的双引号?

谢谢

标签: pythonamazon-s3aws-lambdaamazon-redshiftpsycopg2

解决方案


虽然这不是您问题的确切答案,但这是一个更好的解决方案,不需要数据库连接字符串。

我建议在 lambda 中使用 Redshift Data API 将数据从 S3 加载到 Redshift。您可以摆脱psycopgs2包并使用 lambda 中的内置boto3包。

这将异步运行复制查询,并且 lambda 函数不会花费超过几秒钟的时间来运行它。

我使用sentry_sdk从 lambda 获取运行时错误通知。

import boto3
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://aaaaaa@aaaa.ingest.sentry.io/aaaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def execute_redshift_query(sql):
    data_client = boto3.client('redshift-data')
    data_client.execute_statement(
        ClusterIdentifier='redshift-cluster-test',
        Database='db',
        DbUser='db_user',
        Sql=sql,
        StatementName='Test query',
        WithEvent=True,
    )


def handler(event, context):
    query = """
    copy schema.test_table
    from 's3://test-bucket/test.csv'
    IAM_ROLE 'arn:aws:iam::1234567890:role/TestRole'
    region 'us-east-1'
    ignoreheader 1 csv delimiter ','
    """
    execute_redshift_query(query)
    return True

如果复制查询失败,另一个 lambda 函数会发送错误通知。您可以使用下面屏幕截图中的规则添加 EventBridge lambda 触发器。 在此处输入图像描述

这是发送错误通知的 lambda 代码。

import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://aaaa@aaa.ingest.sentry.io/aaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def lambda_handler(event, context):
    try:
        if event["detail"]["state"] != "FINISHED":
            raise ValueError(str(event))
    except Exception as e:
        sentry_sdk.capture_exception(e)
    return True

StatementName您可以使用第一个 lambda 函数中的定义来确定哪个复制查询失败。

希望它是有帮助的。


推荐阅读