首页 > 解决方案 > 如何从 Spring Webflux FilePart DataBuffer 中逐行读取

问题描述

我正在使用 Spring Webflux 上传文件。就我而言,我必须上传一个 CSV 文件。从我所做的搜索中可以收集到的是以下代码:

public Flux<String> getLines(Flux<FilePart> filePartFlux) {
    return filePartFlux.flatMap(filePart ->
            filePart.content().map(dataBuffer -> {
                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                dataBuffer.read(bytes);
                DataBufferUtils.release(dataBuffer);
                return new String(bytes, StandardCharsets.UTF_8);
            })
            .map(this::processAndGetLinesAsList)
            .flatMapIterable(Function.identity());
}

但在此过程中,如果 dataBuffer 达到其容量,它将部分读取最后一行,因此如果我通过半值,将处理不正确的 csv。有什么方法可以让我确定文件是逐行读取的,而不是通量。

示例:从通量发出的数据缓冲区 1 包含:

<name,title,address
joe,engineer,straight way california,
mike,doc >

Flux 的数据缓冲区 2 包含:

<tor,hatlway street,
jeremy,plumber,newyork>

我目前想到的解决方案是维护一个堆栈,我将通过比较 CSV 拆分的标头长度与拆分的当前记录长度来推送不完整的记录,如果它更少,则从堆栈中弹出 emelemt 并加入两个字符串. 我希望有一种有效的方法来完成这一切。

标签: javaspring-webfluxreactive

解决方案


filePartFlux is Flux<FilePart>

val buffer = DataBufferUtils.join(filePartFlux).block()
val reader =  buffer.asInputStream(true).bufferedReader()
while(reader.ready()) {
    val line = reader.readLine()
    println("$line")
}

@Vishal jotshi,或者这样更好:

DataBufferUtils.join(filePartFlux).subscribe { 
 buffer -> buffer.asInputStream(true).bufferedReader().use {
 reader -> while(reader.ready()) {
    val line = reader.readLine()
    println("$line")
}

}


推荐阅读