amazon-s3 - 写入 HTTP 输出消息后释放 DataBuffer
问题描述
我们正在尝试将文件流式传输到 S3。但这同样适用于使用 WebClient 进行流式传输。
给定 a Flux<DataBuffer>
,可以在直接内存中分配,我需要通过AsyncRequestBody
which 将它发布到 S3 Publisher<ByteBuffer>
。
@Override
public Mono<Document> putObject(final Document document, final Flux<ByteBuffer> content) {
PutObjectRequest request = PutObjectRequest
.builder()
.bucket(document.getBucketName())
.key(document.getObjectKey())
.contentLength(document.getFileSize())
.metadata(Map.of(METADATA_KEY_FILENAME, FileNameUtil.encode(document.getFileName()),
METADATA_KEY_CREATED, Instant.now().toString(),
METADATA_KEY_SIZE, String.valueOf(document.getFileSize())))
.build();
return Mono.fromFuture(s3AsyncClient.putObject(request, AsyncRequestBody.fromPublisher(content)))
.doOnNext(response -> document.setVersionId(response.versionId()))
.thenReturn(document);
}
为了准备我的传入通量,我映射到,并在消耗ByteBuffer
后释放。ByteBuffer
但在下面的示例中,缓冲区在写入输出消息之前被释放:
static Function<Flux<DataBuffer>, Flux<ByteBuffer>> mapAndRelease1() {
return flux -> flux.concatMap(dataBuffer -> Flux.using(
() -> dataBuffer,
db -> Flux.just(db).map(DataBuffer::asByteBuffer),
DataBufferUtils::release,
false
));
我找到了一个将缓冲区复制到堆并释放原始缓冲区的解决方案,这并不理想。
static Function<Flux<DataBuffer>, Flux<ByteBuffer>> mapAndRelease2() {
return flux -> flux.map(dataBuffer -> {
if (dataBuffer.asByteBuffer().hasArray()) {
return dataBuffer.asByteBuffer();
} else {
// copy from direct memory to heap
try {
return ByteBuffer.wrap(dataBuffer.asInputStream().readAllBytes());
} catch (IOException e) {
throw new IllegalStateException(e);
} finally {
DataBufferUtils.release(dataBuffer);
}
}
});
我想使用 WebClient 与BodyInserters.fromPublisher
S3 非常相似AsyncRequestBody
,但我还没有找到任何关于如何操作传入Publisher
以允许在写入输出消息后正确释放缓冲区的示例。
Web客户端示例:
public Mono<Document> uploadDocument(
final Flux<DataBuffer> dataBufferFlux,
final String fileName,
long fileSize,
final DocumentPolicy policy,
final String objectKey,
final String sid) {
return serviceDocumentWebClient.put()
.uri(uriBuilder -> uriBuilder.queryParam("file", fileName).path(UPLOAD_URI)
.build(policy, objectKey)).contentType(APPLICATION_OCTET_STREAM).contentLength(fileSize)
.body(BodyInserters.fromDataBuffers(dataBufferFlux))
.headers(h -> h.set(MONETA_REQUEST_INFO_USER_ID, sid)).accept(APPLICATION_NDJSON, APPLICATION_JSON)
.retrieve()
.bodyToMono(Document.class);
}
解决方案
推荐阅读
- yaml - 有条件地退出动作
- javascript - 提交时在反应组件中获取 API
- c# - 为什么语音识别在网页中有效,但在电子中无效
- javascript - 批量删除 discord.js
- ios - TestFlight 的多个开发人员
- arrays - 如何使用 Google 表格的“开始日期”和“结束日期”创建权益曲线?
- mysql - 返回两个日期时间表达式之间的差异
- nginx - 通过 Laravel Forge 服务控制默认 Cache-Control: no-cache, private Nginx/php-fpm7.4 安装
- javascript - 检测一个按钮,然后在 JavaScript 中按下它
- flutter - 无法检索 Flutter 项目中 Graphql 突变中发生的错误