首页 > 解决方案 > 如何通过 aws Lambda 将自动 csv 处理与 Quicksight 摄取集成?

问题描述

我一直在从事一个项目,该项目使用相当简单的数据管道来清理原始 csv 文件并将其转换为使用 Python3.8 和 Lambda 处理的数据,以创建发送到各个 S3 存储桶的各种子集。Lambda 函数通过将原始 csv 文件上传到入口 S3 存储桶来触发,该存储桶启动该过程。

但是,我还想将一些处理后的数据直接发送到 Quicksight,以便从同一个 Lambda 函数中摄取以进行视觉检查,这就是我目前遇到的问题。

我只有 csv 处理和上传到 S3 的部分功能(省略导入),这是我喜欢直接摄取到 Quicksight 的部分:

def featureengineering(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    s3_file_name =  event['Records'][0]['s3']['object']['key']
    read_file = s3_client.get_object(Bucket=bucket_name,Key=s3_file_name)
   
    #turning the CSV into a dataframe in AWS Lambda
    s3_data = io.BytesIO(read_file.get('Body').read())
    df = pd.read_csv(s3_data, encoding="ISO-8859-1")

    #replacing erroneous zero values to nan (missing) which is more accurate and a general table,
    #and creating a new column with just three stages instead for simplification
    df[['Column_A','Column_B']] = df[['Column_A','Column_B']].replace(0,np.nan) 
    #applying function for feature engineering of 'newstage' function    
    df['NewColumn'] = df.Stage.apply(newstage) 
    
    df1 = df
    df1.to_csv(csv_buffer1)
    s3_resource.Object(bucket1, csv_file_1).put(Body=csv_buffer1.getvalue()) #downloading df1 to S3

因此,此时 df1 被发送到其 S3 存储桶(工作正常),但我希望它也可以作为自动香料刷新直接摄取到 Quicksight 中。

在四处挖掘时,我确实发现了一个类似的问题和答案

import boto3
import time
import sys
client = boto3.client('quicksight')
response = client.create_ingestion(DataSetId='<dataset-id>',IngestionId='<ingetion-id>',AwsAccountId='<aws-account-id>')

但我遇到的挂断是在DataSetId或更一般地说,我如何将 Lambda 函数中的 pandas DataFrame df1 转换为 CreateIngestion API 可以接受并自动发送到 QuickSight 作为最近处理的自动香料刷新数据?

标签: pythonamazon-web-servicesaws-lambdaboto3amazon-quicksight

解决方案


您应该首先创建一个 Quicksight 数据集,引用文档

数据集标识您要使用的数据源中的特定数据。例如,如果您要连接到数据库数据源,则数据源可能是一个表。如果您连接到 Amazon S3 数据源,它可能是一个文件。

在 S3 上保存 DataFrame(作为 csv 或 parquet 文件)后,您可以创建一个从中获取数据的 Quicksight 数据集。

您可以通过控制台或以编程方式执行此操作(可能是您要查找的内容)。

最后,一旦您有了数据集 ID,您就可以在其他 Quicksight API 调用中引用它。


推荐阅读