首页 > 解决方案 > Reactor - 如何压缩通量在飞行中?

问题描述

我需要在没有中间存储的情况下读写压缩(gzip/brotli)流。数据以Flux<ByteBuffer>格式从底层证券接收。数据足够大,无法选择缓冲。如何Flux<ByteBuffer>即时压缩而不必将完整数据存储在内存中或写出到磁盘?

标签: javaspring-webfluxproject-reactor

解决方案


您希望避免缓冲完整数据,但是您可以归档每个 ByteBuffer 块,或者,如果您的块足够小,则可以将块合并成组,然后进行归档。

这不需要太长的内存,但会压缩您的数据。实际的压缩级别取决于源数据的内容以及归档前合并的块数。我认为,您可以手动调整它以获得最佳比例。

可能的代码示例如下:

public class Test_GzipFlux {

/**
 * Returns Flux of gzip-ed buffers after (optional) buffer consolidation
 * @param inFlux input stream of buffers
 * @param consolidatedBufCount number of buffers to consolidate before gzip-ing
 */
public static Flux<ByteBuffer> gzipFlux(Flux<ByteBuffer> inFlux, 
                                        int consolidatedBufCount, int outChunkMaxLength) {
    return inFlux.buffer(consolidatedBufCount)
                 .map(inList->zipBuffers(inList, outChunkMaxLength));
}

/**
 * Consolidates buffers from input list, applies gzip, returns result as single buffer
 * @param inList portion of chunks to be consolidated
 * @param outChunkMaxLength estimated length of output chunk. 
 *        !!! to avoid pipe deadlock, this length to be sufficient
 *        !!! for consolidated data after gzip 
 */
private static ByteBuffer zipBuffers(List<ByteBuffer> inList, int outChunkMaxLength) {
    try {
        PipedInputStream pis = new PipedInputStream(outChunkMaxLength);
        GZIPOutputStream gos = new GZIPOutputStream(new PipedOutputStream(pis));

        for (var buf: inList) {
            gos.write(buf.array());
        }
        gos.close();
        byte[] outBytes = new byte[pis.available()];
        pis.read(outBytes);
        pis.close();
        return ByteBuffer.wrap(outBytes);
    } catch (IOException e) {
        throw new RuntimeException(e.getMessage(), e);
    }       
}

private static void test() {
    int inLength  = ... // actual full length of source data
    Flux<ByteBuffer> source = ... // your source Flux
    
    // these are parameters for your adjustment
    int consolidationCount = 5;
    int outChunkMaxLength= 30 * 1024;

    Flux<ByteBuffer> result = gzipFlux(source,consolidationCount, outChunkMaxLength);           

    int outLen = result.reduce(0, (res, bb) -> res + bb.array().length).block();
    System.out.println("ratio=" + (double)inLength/outLen);
}
}

推荐阅读