首页 > 解决方案 > 从 InputStream 到并行 Stream

问题描述

我得到一个包含多个元素的 InputStream,它们被 Stream 以串行方式扫描、解析、迭代(与它们在 中的顺序相同InputStream),然后保存在数据库中。这工作正常。

现在,我正在尝试以并行方式迭代 Stream Stream<T>.parallel(),因此当一个线程被阻塞持久化时,其他线程仍然可以扫描 InputStream 并持久化。

然后,我尝试将结果Stream<MyElement>Stream<T>.parallel(). 为了检查并行化是否有效,我在流中添加了一个添加随机延迟的 map 函数。我期待结果元素以随机顺序打印。

但结果不是预期的。元素仍按文件顺序显示。

有没有办法正确地并行迭代这个流?

public class FromInputStreamToParallelStream {    

    public static Stream<MyElement> getStream(InputStream is) {
        try (var scanner = new Scanner(is)) {
            return scanner//
                    .useDelimiter("DELIMITER")
                    .tokens()
                    .parallel() 
                    .map(MyElementParser::parse);
        }
    }

    @Test
    public void test() throws IOException  {
        try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
            getStream(in)
                    .map(FromInputStreamToParallelStream::sleepRandom) 
                    .forEach(System.out::println);
        }
    }

    private static MyElement sleepRandom(MyElement element) {
        var randomNumber = new Random().nextInt(10);
        System.out.println("wait. " + randomNumber);

        try {
            TimeUnit.SECONDS.sleep(randomNumber);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return element;
    }
}

我想我需要实现自己的Spliterator<T>.

提前致谢。

标签: javajava-stream

解决方案


推荐阅读