首页 > 解决方案 > 如何使用 golang 执行 AWS S3 多部分复制

问题描述

我正在查看S3 Copy Object函数的 AWS golang 文档,它包含以下用于处理大文件上传的详细信息

但是,要复制大于 5 GB 的对象,您必须使用分段上传 Upload Part ->Copy API。有关更多信息,请参阅使用 REST 分段上传 API 复制对象 >( https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html )。

当我点击该链接时,它仅包含 Java 和 .Net 的代码示例 仅 Java 和 .Net 示例

我是否在某处遗漏了一些文档/示例,展示了如何使用 golang 客户端在 S3 中复制现有的大文件?

标签: goamazon-s3

解决方案


与@Mike's answer相同的方法,但使用AWS-SDK-GO-V2

import (
    "logger"
    "context"
    "errors"
    "strconv"
    "strings"
    "time"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/service/s3/types"
)

//constant for number of bits in 5 megabyte chunk
const max_part_size = 5 * 1024 * 1024

var log *logger.Logger

//helper function to build the string for the range of bits to copy
func buildCopySourceRange(start int64, objectSize int64) string {
    end := start + max_part_size - 1
    if end > objectSize {
        end = objectSize - 1
    }
    startRange := strconv.FormatInt(start, 10)
    stopRange := strconv.FormatInt(end, 10)
    return "bytes=" + startRange + "-" + stopRange
}

//function that starts, perform each part upload, and completes the copy
func MultiPartCopy(svc *s3.Client, fileSize int64, sourceBucket string, sourceKey string, destBucket string, destKey string) error {
    log = logger.GetLogger()

    ctx, cancelFn := context.WithTimeout(context.TODO(), 10*time.Minute)
    defer cancelFn()

    //struct for starting a multipart upload
    startInput := s3.CreateMultipartUploadInput{
        Bucket: &destBucket,
        Key:    &destKey,
    }

    //send command to start copy and get the upload id as it is needed later
    var uploadId string
    createOutput, err := svc.CreateMultipartUpload(ctx, &startInput)
    if err != nil {
        return err
    }
    if createOutput != nil {
        if createOutput.UploadId != nil {
            uploadId = *createOutput.UploadId
        }
    }
    if uploadId == "" {
        return errors.New("No upload id found in start upload request")
    }

    var i int64
    var partNumber int32 = 1
    copySource := "/" + sourceBucket + "/" + sourceKey
    parts := make([]types.CompletedPart, 0)
    numUploads := fileSize / max_part_size
    log.Infof("Will attempt upload in %d number of parts to %s", numUploads, destKey)
    for i = 0; i < fileSize; i += max_part_size {
        copyRange := buildCopySourceRange(i, fileSize)
        partInput := s3.UploadPartCopyInput{
            Bucket:          &destBucket,
            CopySource:      &copySource,
            CopySourceRange: &copyRange,
            Key:             &destKey,
            PartNumber:      partNumber,
            UploadId:        &uploadId,
        }
        log.Debugf("Attempting to upload part %d range: %s", partNumber, copyRange)
        partResp, err := svc.UploadPartCopy(context.TODO(), &partInput)

        if err != nil {
            log.Error("Attempting to abort upload")
            abortIn := s3.AbortMultipartUploadInput{
                UploadId: &uploadId,
            }
            //ignoring any errors with aborting the copy
            svc.AbortMultipartUpload(context.TODO(), &abortIn)
            return fmt.Errorf("Error uploading part %d : %w", partNumber, err)
        }

        //copy etag and part number from response as it is needed for completion
        if partResp != nil {
            partNum := partNumber
            etag := strings.Trim(*partResp.CopyPartResult.ETag, "\"")
            cPart := types.CompletedPart{
                ETag:       &etag,
                PartNumber: partNum,
            }
            parts = append(parts, cPart)
            log.Debugf("Successfully upload part %d of %s", partNumber, uploadId)
        }
        partNumber++
        if partNumber%50 == 0 {
            log.Infof("Completed part %d of %d to %s", partNumber, numUploads, destKey)
        }
    }

    //create struct for completing the upload
    mpu := types.CompletedMultipartUpload{
        Parts: parts,
    }

    //complete actual upload
    //does not actually copy if the complete command is not received
    complete := s3.CompleteMultipartUploadInput{
        Bucket:          &destBucket,
        Key:             &destKey,
        UploadId:        &uploadId,
        MultipartUpload: &mpu,
    }
    compOutput, err := svc.CompleteMultipartUpload(context.TODO(), &complete)
    if err != nil {
        return fmt.Errorf("Error completing upload: %w", err)
    }
    if compOutput != nil {
        log.Infof("Successfully copied Bucket: %s Key: %s to Bucket: %s Key: %s", sourceBucket, sourceKey, destBucket, destKey)
    }
    return nil
}

@Mike一个问题。您使用的是AWS-SDK-GO-V2AbortMultipartUploadRequest中不存在的,所以我使用了,希望它不会造成太大差异?AbortMultipartUpload


推荐阅读