java - Java OutputStream that incrementally processes text
问题描述
I want to incrementally process the text written to an OutputStream
as it is written.
For example, suppose we have this program:
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
public class Streaming {
// Writes file, incrementally, to OutputStream.
static void dump(File file, OutputStream out) throws IOException {
// Implementation omitted
}
static int sum = 0;
public static void main(String[] args) throws IOException {
Charset charSet = Charset.defaultCharset(); // Interpret the file as having this encoding.
dump(new File("file.txt"), new OutputStream() {
@Override
public void write(int b) throws IOException {
// Add b to bytes already read,
// Determine if we have reached the end of the token (using
// the default encoding),
// And parse the token and add it to `sum`
}
});
System.out.println("Sum: " + sum);
}
}
Suppose file.txt
is a text file containing a space-delimited list of ints. In this program, I wish to find the sum of the ints in file.txt
, accumulating the sum in the sum
variable. I would like to avoid building up a String that is millions of characters long.
I'm interested in a way that I can accomplish this using the dump
function, which writes the contents of a file to an output stream. I'm not interested in reading the file in another way (e.g. creating a Scanner
for file.txt
and repeatedly calling nextInt
on the scanner). I'm imposing this restriction because I'm using a library that has an API similar to dump
, where the client must provide an OutputStream
, and the library subsequently writes a lot of text to the output stream.
How can I implement the write
method to correctly perform the steps as outlined? I would like to avoid doing the tokenization by hand, since utilities like Scanner
are already capable of doing tokenization, and I want to be able to handle any encoding of text (as specified by charSet
). However, I can't use Scanner
directly, because there's no way of checking (in a non-blocking way) if a token is available:
public static void main(String[] args) throws IOException {
Charset charSet = Charset.defaultCharset();
PipedInputStream in = new PipedInputStream();
try (Scanner sc = new Scanner(in, charSet)) {
dump(new File("file.txt"), new PipedOutputStream(in) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
super.write(b, off, len);
// This will loop infinitely, because `hasNextInt`
// will block if there is no int token currently available.
if (sc.hasNextInt()) {
sum += sc.nextInt();
}
}
});
}
System.out.println("Sum: " + sum);
System.out.println(charSet);
}
Is there a non-blocking utility that can perform the tokenization for me as data is written to the output stream?
解决方案
如果我正确理解您的问题,FilterOutputStream就是您想要子类化的内容。 DigestOutputStream扩展了 FilterOutputStream 并做了一些类似于您想要做的事情:它监视通过的字节并将它们传递给不同的类进行处理。
想到的一种解决方案是 FilterOutputStream 将字节传递给PipedOutputStream,连接到 PipedInputStream ,另一个线程读取该 PipedInputStream 以创建总和:
PipedOutputStream sumSink = new PipedOutputStream();
Callable<Long> sumCalculator = new Callable<Long>() {
@Override
public Long call()
throws IOException {
long sum = 0;
PipedInputStream source = new PipedInputStream(sumSink);
try (Scanner scanner = new Scanner(source, charSet)) {
while (scanner.hasNextInt()) {
sum += scanner.nextInt();
}
}
return sum;
}
};
Future<Long> sumTask = ForkJoinPool.commonPool().submit(sumCalculator);
OutputStream dest = getTrueDestinationOutputStream();
dest = new FilterOutputStream(dest) {
@Override
public void write(int b)
throws IOException {
super.write(b);
sumSink.write(b);
}
@Override
public void write(byte[] b)
throws IOException {
super.write(b);
sumSink.write(b);
}
@Override
public void write(byte[] b,
int offset,
int len)
throws IOException {
super.write(b, offset, len);
sumSink.write(b, offset, len);
}
@Override
public void flush()
throws IOException {
super.flush();
sumSink.flush();
}
@Override
public void close()
throws IOException {
super.close();
sumSink.close();
}
};
dump(file, dest);
long sum = sumTask.get();
推荐阅读
- list - 回车加入名单
- javascript - 更新面板中 JQUERY 数据表中的排序列
- postgresql - 使用官方 postgres 镜像运行 docker 容器,无法访问容器
- sql - 如何使用 tSQL 根据条件定义 EventStop 时间戳?
- spring-boot - 我实现了具有管理员和客户的 api 列表的客户服务
- git - Github 部署密钥允许我克隆多个私有存储库
- java - 如何在同一个 Android 应用程序中使用 Navigation Drawer 和 Bottom Navigation
- snakemake - snakemake中的“只能将输入文件指定为函数”
- sql-server - Schema Compare 和 SqlPackage 150 区分大小写问题
- python - 如何修复 Debian-RaspberryPi3 上损坏的 apt-listchanges?