首页 > 解决方案 > Java parallelStream java.util.concurrent.RejectedExecutionException:线程限制超出替换阻塞的工作人员

问题描述

我正在使用 Hazelcast Jet 对大量元素(约 4.8 亿)进行计算。

我开始使用包含起始数据的 IMap Sink。我正在使用

 Collections2.permutations(initialPermutation).parallelStream().forEach(set -> {
        Permutation permutation = new Permutation(set);
        permutations.put(permutation.toString(), permutation);
    });

其中 Permutation 是一个简单的数据类,而 set 是一个整数列表。运行此并行流时,出现以下异常:

java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker

而且我不知道如何解决它。

标签: javahazelcasthazelcast-imaphazelcast-jet

解决方案


您的错误在于使用parallelStream()来执行阻塞工作。Streams API 用于计算负载,但您需要通过网络负载。Streams API 的实现具有检测其线程池中的线程何时进入阻塞状态并将其替换为新线程的代码。但是,这只发生在一个阈值内,然后您就会遇到异常。

相反,为了在 Hazelcast 中实现最佳吞吐量,请使用批处理:将其拆分为块,然后使用 with 推送到地图map.setAll(chunk)

使用此答案中的代码将您Stream<Set<Integer>>转换为Stream<List<Set<Integer>>,如下所示(chunkify()在链接的答案中定义):

chunkify(Collections2.permutations(initialPermutation)).stream().forEach(chunk -> {
    Map<String, String> chunkAsMap = chunk.stream().map(Permutation::new).map(Object::toString).collect(toMap(x -> x, x -> x));
    permutations.setAll(chunkAsMap);
});

推荐阅读