python - 分布式处理 - AWS Sagemaker
问题描述
我的 S3 存储桶中有一些原始.csv
文件。如何并行处理它们以减少运行时间?请参阅有关我需要帮助的地方的评论。我正在使用SKLearnProcessor
和s3_data_distribution_type='ShardedByS3Key'
%%writefile preprocessing/preprocessing_sklearn.py
import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os
def process(input_data_path):
df = pd.read_csv(input_data_path)
# drop first col (unamed: 0)
df = df.iloc[: , 1:]
features = df.iloc[:,1:]
headers = features.columns
labels = df.iloc[:,0]
scaler = StandardScaler()
normalized_x_train = scaler.fit_transform(features)
# write
pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)
if __name__ == '__main__':
# HOW DO I MAKE THIS DYNAMIC?
input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")
process(input_data_path)
我的电话 fn -
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit
start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?[![enter image description here][1]][1]
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv"
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"
sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
role=role,
instance_type='ml.m5.xlarge',
instance_count=2,
base_job_name = 'preprocess-sklearn'
)
sklearn_processor.run(
code='preprocessing/preprocessing_sklearn.py',
inputs=[
ProcessingInput(
source=source2,
s3_data_distribution_type='ShardedByS3Key',
destination='/opt/ml/processing/input')
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output/train',
destination= make_url(store_bucket, "preprocess_sklearn", "train")
),
#
ProcessingOutput(
source='/opt/ml/processing/output/test',
destination= make_url(store_bucket, "preprocess_sklearn", "test")
)
]
)
stop = timeit.default_timer()
print('Time: ', stop - start)
解决方案
AWS 文档不清楚他们如何管理水平扩展并将多个实例的输出聚合到 S3 中。我相信我们只能假设 SageMaker 自动处理并行处理s3_data_distribution_type='ShardedByS3Key'
,将输入数据拆分为分片,分配每个分片,并聚合输出。
到目前为止我看到的唯一评论。
要在 Amazon SageMaker Processing 上使用 Scikit-Learn 并行处理数据
s3_data_distribution_type='ShardedByS3Key'
,您可以通过在 ProcessingInput 中设置来按 S3 键对输入对象进行分片,以便每个实例接收大约相同数量的输入对象。
是否将来自 Amazon S3 的数据分发到使用 FullyReplicated 的所有处理实例,或者来自 Amazon S3 的数据是否通过 Amazon S3 密钥共享,将一个数据分片下载到每个处理实例。
默认。S3DataType='S3Prefix'
并使用指定的 S3 路径下的文件。
无论您使用 S3Prefix 还是 ManifestFile 作为数据类型。如果您选择
S3Prefix
,S3Uri 将标识一个键名前缀。Amazon SageMaker 使用具有指定键名前缀的所有对象来处理作业。如果您选择 ManifestFile,S3Uri 将标识一个对象,该对象是一个清单文件,其中包含您希望 Amazon SageMaker 用于处理作业的对象键列表。
AWS github 示例仅使用一个实例instance_count=1
,因此也无法获得线索。
只希望 AWS 改进其文档。
并不是说 AWS 比 GCP 更难使用,而是它不必要地难;基础设施原语的杂乱无章的蔓延,它们之间的凝聚力很差。
挑战是好的,混乱的混乱不是,AWS 的问题是你的大部分工作时间将花在整理他们的文档和筛选功能和产品以找到你想要的东西,而不是专注于有趣的有趣挑战.
推荐阅读
- python - Pygame连接多个点
- reactjs - 无法在 map 函数之外读取存储在 redux 存储中的变量
- realm - 如何从领域对象属性中修复零值?
- c# - 在 C# 中按升序对字符串数组进行排序
- react-native - 解密,md5在本机反应中加密数据
- python - Line-to-line intersect with pygame's inverted y-axis
- dapper - 如何使用 Entity Framework Core 或 Dapper 批量更新表的列?
- angular - 在嵌套对象数组的情况下,如何在 Angular 组件中使用数据过滤?
- oracle - Spring boot JPA CrudRepository 用于不同的 oracle 模式
- python - 递归地在嵌套字典中循环浮点数