首页 > 解决方案 > 将 ZipOutputStream 上传到 S3,而不使用 AWS S3 Java 将 zip 文件(大)临时保存到磁盘

问题描述

我需要从 S3 下载照片(不在同一目录中),将它们压缩并再次使用 AWS S3 Java SDK 上传到 S3。此 zip 文件大小可以以 GB 为单位。目前我正在使用 AWS Lambda,它的临时存储限制为 500 MB。所以我不想将 ZIP 文件保存在磁盘上,而是想将 ZIP 文件(使用从 S3 下载的照片动态创建)直接流式传输到 S3。我需要使用 AWS S3 Java SDK。

标签: javaamazon-s3aws-lambdaaws-java-sdkzipoutputstream

解决方案


基本思想是使用流式操作。这样您就不会等到 ZIP 在文件系统上生成,而是在 ZIP 算法生成任何数据时立即开始上传。显然,一些数据会缓存在内存中,仍然不需要等待整个 ZIP 在磁盘上生成。我们还将在两个线程中使用流组合和PipedInputStream/ PipedOutputStream:一个用于读取数据,另一个用于压缩内容。

这是的版本:

final AmazonS3 client = AmazonS3ClientBuilder.defaultClient();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        S3Objects
                // It's just a convenient way to list all the objects. Replace with you own logic.
                .inBucket(client, "bucket")
                .forEach((S3ObjectSummary objectSummary) -> {
                    try {
                        if (objectSummary.getKey().endsWith(".png")) {
                            System.out.println("Processing " + objectSummary.getKey());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the
                                    // objectSummary
                            );

                            zipOutputStream.putNextEntry(entry);

                            IOUtils.copy(
                                    client.getObject(
                                            objectSummary.getBucketName(),
                                            objectSummary.getKey()
                                    ).getObjectContent(),
                                    zipOutputStream
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                "another-bucket",
                "previews.zip",
                pipedInputStream,
                new ObjectMetadata()
        );

        pipedInputStream.close();
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

但是,请注意它会打印一个警告:

WARNING: No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.

这是因为 S3 需要在上传之前提前知道数据的大小。不可能提前知道生成的 ZIP 的大小。您可能可以通过分段上传来试试运气,但代码会更棘手。不过,想法是相似的:一个线程应该读取数据并以 ZIP 流的形式发送内容,而另一个线程应该读取 ZIPped 条目并将它们作为多部分上传。上传所有条目(部分)后,应该完成多部分。

这是的示例:

final S3Client client = S3Client.create();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        client.listObjectsV2Paginator(
                ListObjectsV2Request
                        .builder()
                        .bucket("bucket")
                        .build()
        )
                .contents()
                .forEach((S3Object object) -> {
                    try {
                        if (object.key().endsWith(".png")) {
                            System.out.println("Processing " + object.key());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the object
                            );

                            zipOutputStream.putNextEntry(entry);

                            client.getObject(
                                    GetObjectRequest
                                            .builder()
                                            .bucket("bucket")
                                            .key(object.key())
                                            .build(),
                                    ResponseTransformer.toOutputStream(zipOutputStream)
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                PutObjectRequest
                        .builder()
                        .bucket("another-bucket")
                        .key("previews.zip")
                        .build(),
                RequestBody.fromBytes(
                        IOUtils.toByteArray(pipedInputStream)
                )
        );
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

它遭受了同样的困扰:ZIP 需要在上传之前在内存中准备好。

如果你有兴趣,我准备了一个演示项目,你可以玩代码。


推荐阅读