首页 > 解决方案 > Java OutputStream that incrementally processes text

问题描述

I want to incrementally process the text written to an OutputStream as it is written.

For example, suppose we have this program:

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;

public class Streaming {

    // Writes file, incrementally, to OutputStream.
    static void dump(File file, OutputStream out) throws IOException {
        // Implementation omitted
    }

    static int sum = 0;
    public static void main(String[] args) throws IOException {
        Charset charSet = Charset.defaultCharset(); // Interpret the file as having this encoding.
        dump(new File("file.txt"), new OutputStream() {
            @Override
            public void write(int b) throws IOException {
                // Add b to bytes already read,
                // Determine if we have reached the end of the token (using
                //   the default encoding),
                // And parse the token and add it to `sum`
            }
        });
        System.out.println("Sum: " + sum);
    }
}

Suppose file.txt is a text file containing a space-delimited list of ints. In this program, I wish to find the sum of the ints in file.txt, accumulating the sum in the sum variable. I would like to avoid building up a String that is millions of characters long.

I'm interested in a way that I can accomplish this using the dump function, which writes the contents of a file to an output stream. I'm not interested in reading the file in another way (e.g. creating a Scanner for file.txt and repeatedly calling nextInt on the scanner). I'm imposing this restriction because I'm using a library that has an API similar to dump, where the client must provide an OutputStream, and the library subsequently writes a lot of text to the output stream.

How can I implement the write method to correctly perform the steps as outlined? I would like to avoid doing the tokenization by hand, since utilities like Scanner are already capable of doing tokenization, and I want to be able to handle any encoding of text (as specified by charSet). However, I can't use Scanner directly, because there's no way of checking (in a non-blocking way) if a token is available:

    public static void main(String[] args) throws IOException {
        Charset charSet = Charset.defaultCharset();
        PipedInputStream in = new PipedInputStream();
        try (Scanner sc = new Scanner(in, charSet)) {
            dump(new File("file.txt"), new PipedOutputStream(in) {
                @Override
                public void write(byte[] b, int off, int len) throws IOException {
                    super.write(b, off, len);
                    // This will loop infinitely, because `hasNextInt`
                    // will block if there is no int token currently available.
                    if (sc.hasNextInt()) {
                        sum += sc.nextInt();
                    }
                }
            });
        }
        System.out.println("Sum: " + sum);
        System.out.println(charSet);
    }

Is there a non-blocking utility that can perform the tokenization for me as data is written to the output stream?

标签: javaoutputstream

解决方案


如果我正确理解您的问题,FilterOutputStream就是您想要子类化的内容。 DigestOutputStream扩展了 FilterOutputStream 并做了一些类似于您想要做的事情:它监视通过的字节并将它们传递给不同的类进行处理。

想到的一种解决方案是 FilterOutputStream 将字节传递给PipedOutputStream,连接到 PipedInputStream ,另一个线程读取该 PipedInputStream 以创建总和:

PipedOutputStream sumSink = new PipedOutputStream();

Callable<Long> sumCalculator = new Callable<Long>() {
    @Override
    public Long call()
    throws IOException {

        long sum = 0;
        PipedInputStream source = new PipedInputStream(sumSink);

        try (Scanner scanner = new Scanner(source, charSet)) {
            while (scanner.hasNextInt()) {
                sum += scanner.nextInt();
            }
        }

        return sum;
    }
};
Future<Long> sumTask = ForkJoinPool.commonPool().submit(sumCalculator);

OutputStream dest = getTrueDestinationOutputStream();
dest = new FilterOutputStream(dest) {
    @Override
    public void write(int b)
    throws IOException {
        super.write(b);
        sumSink.write(b);
    }

    @Override
    public void write(byte[] b)
    throws IOException {
        super.write(b);
        sumSink.write(b);
    }

    @Override
    public void write(byte[] b,
                      int offset,
                      int len)
    throws IOException {
        super.write(b, offset, len);
        sumSink.write(b, offset, len);
    }

    @Override
    public void flush()
    throws IOException {
        super.flush();
        sumSink.flush();
    }

    @Override
    public void close()
    throws IOException {
        super.close();
        sumSink.close();
    }
};

dump(file, dest);

long sum = sumTask.get();

推荐阅读