首页 > 解决方案 > 写入 HTTP 输出消息后释放 DataBuffer

问题描述

我们正在尝试将文件流式传输到 S3。但这同样适用于使用 WebClient 进行流式传输。

给定 a Flux<DataBuffer>,可以在直接内存中分配,我需要通过AsyncRequestBodywhich 将它发布到 S3 Publisher<ByteBuffer>

    @Override
    public Mono<Document> putObject(final Document document, final Flux<ByteBuffer> content) {
        PutObjectRequest request = PutObjectRequest
                .builder()
                .bucket(document.getBucketName())
                .key(document.getObjectKey())
                .contentLength(document.getFileSize())
                .metadata(Map.of(METADATA_KEY_FILENAME, FileNameUtil.encode(document.getFileName()),
                        METADATA_KEY_CREATED, Instant.now().toString(),
                        METADATA_KEY_SIZE, String.valueOf(document.getFileSize())))
                .build();
        return Mono.fromFuture(s3AsyncClient.putObject(request, AsyncRequestBody.fromPublisher(content)))
                .doOnNext(response -> document.setVersionId(response.versionId()))
                .thenReturn(document);
    }

为了准备我的传入通量,我映射到,并在消耗ByteBuffer后释放。ByteBuffer但在下面的示例中,缓冲区在写入输出消息之前被释放:

static Function<Flux<DataBuffer>, Flux<ByteBuffer>> mapAndRelease1() {
        return flux -> flux.concatMap(dataBuffer -> Flux.using(
                () -> dataBuffer,
                db -> Flux.just(db).map(DataBuffer::asByteBuffer),
                DataBufferUtils::release,
                false
        ));

我找到了一个将缓冲区复制到堆并释放原始缓冲区的解决方案,这并不理想。

static Function<Flux<DataBuffer>, Flux<ByteBuffer>> mapAndRelease2() {
        return flux -> flux.map(dataBuffer -> {
            if (dataBuffer.asByteBuffer().hasArray()) {
                return dataBuffer.asByteBuffer();
            } else {
                // copy from direct memory to heap
                try {
                    return ByteBuffer.wrap(dataBuffer.asInputStream().readAllBytes());
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                } finally {
                    DataBufferUtils.release(dataBuffer);
                }
            }
        });

我想使用 WebClient 与BodyInserters.fromPublisherS3 非常相似AsyncRequestBody,但我还没有找到任何关于如何操作传入Publisher以允许在写入输出消息后正确释放缓冲区的示例。

Web客户端示例:

    public Mono<Document> uploadDocument(
            final Flux<DataBuffer> dataBufferFlux,
            final String fileName,
            long fileSize,
            final DocumentPolicy policy,
            final String objectKey,
            final String sid) {
        return serviceDocumentWebClient.put()
                .uri(uriBuilder -> uriBuilder.queryParam("file", fileName).path(UPLOAD_URI)
                        .build(policy, objectKey)).contentType(APPLICATION_OCTET_STREAM).contentLength(fileSize)
                .body(BodyInserters.fromDataBuffers(dataBufferFlux))
                .headers(h -> h.set(MONETA_REQUEST_INFO_USER_ID, sid)).accept(APPLICATION_NDJSON, APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Document.class);
    }

标签: amazon-s3spring-webfluxwebclientreactiveflux

解决方案


推荐阅读