首页 > 解决方案 > 分布式处理 - AWS Sagemaker

问题描述

我的 S3 存储桶中有一些原始.csv文件。如何并行处理它们以减少运行时间?请参阅有关我需要帮助的地方的评论。我正在使用SKLearnProcessors3_data_distribution_type='ShardedByS3Key'

在此处输入图像描述

%%writefile preprocessing/preprocessing_sklearn.py
    
import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os

def process(input_data_path):
    df = pd.read_csv(input_data_path)
#     drop first col (unamed: 0)
    df = df.iloc[: , 1:]
    
    features = df.iloc[:,1:]
    headers = features.columns
    labels = df.iloc[:,0]

    scaler = StandardScaler()
    
    normalized_x_train = scaler.fit_transform(features)

    # write
    pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
    pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)
    
if __name__ == '__main__':
    # HOW DO I MAKE THIS DYNAMIC?
    input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")  
    process(input_data_path)

我的电话 fn -

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit

start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?[![enter image description here][1]][1]
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv" 
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"

sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2,
                                     base_job_name = 'preprocess-sklearn'
                                    )

sklearn_processor.run(
    code='preprocessing/preprocessing_sklearn.py',
    inputs=[
        ProcessingInput(
            source=source2,
            s3_data_distribution_type='ShardedByS3Key',
            destination='/opt/ml/processing/input')
    ],
    
    outputs=[
        ProcessingOutput(
          source='/opt/ml/processing/output/train', 
          destination= make_url(store_bucket, "preprocess_sklearn", "train")
        ),
#                                
        ProcessingOutput(
            source='/opt/ml/processing/output/test',
            destination= make_url(store_bucket, "preprocess_sklearn", "test")
        )
    ]
                     
)

stop = timeit.default_timer()

print('Time: ', stop - start) 

标签: pythonamazon-web-servicesamazon-s3amazon-sagemaker

解决方案


AWS 文档不清楚他们如何管理水平扩展并将多个实例的输出聚合到 S3 中。我相信我们只能假设 SageMaker 自动处理并行处理s3_data_distribution_type='ShardedByS3Key',将输入数据拆分为分片,分配每个分片,并聚合输出。

到目前为止我看到的唯一评论。

要在 Amazon SageMaker Processing 上使用 Scikit-Learn 并行处理数据s3_data_distribution_type='ShardedByS3Key',您可以通过在 ProcessingInput 中设置来按 S3 键对输入对象进行分片,以便每个实例接收大约相同数量的输入对象。

是否将来自 Amazon S3 的数据分发到使用 FullyReplicated 的所有处理实例,或者来自 Amazon S3 的数据是否通过 Amazon S3 密钥共享,将一个数据分片下载到每个处理实例

默认。S3DataType='S3Prefix'并使用指定的 S3 路径下的文件。

无论您使用 S3Prefix 还是 ManifestFile 作为数据类型。如果您选择S3PrefixS3Uri 将标识一个键名前缀。Amazon SageMaker 使用具有指定键名前缀的所有对象来处理作业。如果您选择 ManifestFile,S3Uri 将标识一个对象,该对象是一个清单文件,其中包含您希望 Amazon SageMaker 用于处理作业的对象键列表。

AWS github 示例仅使用一个实例instance_count=1,因此也无法获得线索。


只希望 AWS 改进其文档。

并不是说 AWS 比 GCP 更难使用,而是它不必要地难;基础设施原语的杂乱无章的蔓延,它们之间的凝聚力很差。

挑战是好的,混乱的混乱不是,AWS 的问题是你的大部分工作时间将花在整理他们的文档和筛选功能和产品以找到你想要的东西,而不是专注于有趣的有趣挑战.


推荐阅读