java - Reactor - 如何压缩通量在飞行中?
问题描述
我需要在没有中间存储的情况下读写压缩(gzip/brotli)流。数据以Flux<ByteBuffer>
格式从底层证券接收。数据足够大,无法选择缓冲。如何Flux<ByteBuffer>
即时压缩而不必将完整数据存储在内存中或写出到磁盘?
解决方案
您希望避免缓冲完整数据,但是您可以归档每个 ByteBuffer 块,或者,如果您的块足够小,则可以将块合并成组,然后进行归档。
这不需要太长的内存,但会压缩您的数据。实际的压缩级别取决于源数据的内容以及归档前合并的块数。我认为,您可以手动调整它以获得最佳比例。
可能的代码示例如下:
public class Test_GzipFlux {
/**
* Returns Flux of gzip-ed buffers after (optional) buffer consolidation
* @param inFlux input stream of buffers
* @param consolidatedBufCount number of buffers to consolidate before gzip-ing
*/
public static Flux<ByteBuffer> gzipFlux(Flux<ByteBuffer> inFlux,
int consolidatedBufCount, int outChunkMaxLength) {
return inFlux.buffer(consolidatedBufCount)
.map(inList->zipBuffers(inList, outChunkMaxLength));
}
/**
* Consolidates buffers from input list, applies gzip, returns result as single buffer
* @param inList portion of chunks to be consolidated
* @param outChunkMaxLength estimated length of output chunk.
* !!! to avoid pipe deadlock, this length to be sufficient
* !!! for consolidated data after gzip
*/
private static ByteBuffer zipBuffers(List<ByteBuffer> inList, int outChunkMaxLength) {
try {
PipedInputStream pis = new PipedInputStream(outChunkMaxLength);
GZIPOutputStream gos = new GZIPOutputStream(new PipedOutputStream(pis));
for (var buf: inList) {
gos.write(buf.array());
}
gos.close();
byte[] outBytes = new byte[pis.available()];
pis.read(outBytes);
pis.close();
return ByteBuffer.wrap(outBytes);
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private static void test() {
int inLength = ... // actual full length of source data
Flux<ByteBuffer> source = ... // your source Flux
// these are parameters for your adjustment
int consolidationCount = 5;
int outChunkMaxLength= 30 * 1024;
Flux<ByteBuffer> result = gzipFlux(source,consolidationCount, outChunkMaxLength);
int outLen = result.reduce(0, (res, bb) -> res + bb.array().length).block();
System.out.println("ratio=" + (double)inLength/outLen);
}
}
推荐阅读
- javascript - Chart.js - 条形图的散点图等价物是什么?
- node.js - 可以使用 Kerberos SSO 为主机创建-react-app / WebpackDevServer 代理吗?
- scala - 在 spark 数据框中使用案例类的好处
- c# - Python 中 MQTT over TLS 验证错误(C# 和 Mqtt.fx 中没有错误)
- mysql - 如何获取 MySql 选择/加入查询中每个名称的最新值?
- ionic-framework - 单击日历字段时想要禁用离子应用程序中的键盘
- python - Pyside - 使用 args 时无法连接信号
- ios - iOS Siri 快捷方式:我可以隐藏 Siri 上的操作按钮吗?
- php - 空数组在 Laravel 刀片中不会显示为空
- javascript - 如何使用 jQuery 或 JavaScript 添加元素?