java - Java parallelStream java.util.concurrent.RejectedExecutionException:线程限制超出替换阻塞的工作人员
问题描述
我正在使用 Hazelcast Jet 对大量元素(约 4.8 亿)进行计算。
我开始使用包含起始数据的 IMap Sink。我正在使用
Collections2.permutations(initialPermutation).parallelStream().forEach(set -> {
Permutation permutation = new Permutation(set);
permutations.put(permutation.toString(), permutation);
});
其中 Permutation 是一个简单的数据类,而 set 是一个整数列表。运行此并行流时,出现以下异常:
java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
而且我不知道如何解决它。
解决方案
您的错误在于使用parallelStream()
来执行阻塞工作。Streams API 用于计算负载,但您需要通过网络负载。Streams API 的实现具有检测其线程池中的线程何时进入阻塞状态并将其替换为新线程的代码。但是,这只发生在一个阈值内,然后您就会遇到异常。
相反,为了在 Hazelcast 中实现最佳吞吐量,请使用批处理:将其拆分为块,然后使用 with 推送到地图map.setAll(chunk)
。
使用此答案中的代码将您Stream<Set<Integer>>
转换为Stream<List<Set<Integer>>
,如下所示(chunkify()
在链接的答案中定义):
chunkify(Collections2.permutations(initialPermutation)).stream().forEach(chunk -> {
Map<String, String> chunkAsMap = chunk.stream().map(Permutation::new).map(Object::toString).collect(toMap(x -> x, x -> x));
permutations.setAll(chunkAsMap);
});
推荐阅读
- ruby-on-rails - Rails:我如何重构我的控制器参数代码
- reactjs - 在将我的 axios 代码转换为 react-query 时,我得到了“太多的重新渲染”
- angular - 在 Azure 上部署 Angular 应用程序时出现“/”应用程序中的服务器错误
- c# - RestSharp Post Json 对象
- javascript - 使用 JavaScript 下载图像
- c# - SWIG C# 使用 Director =“1”传递二进制数据
- nginx - nginx docker compose 未启动并运行
- javascript - 成功验证后将表单数据发送到 .php 文件的 Ajax 代码
- python - 没有root权限时如何使用pgadmin python
- r - 如何在 Shiny 中对多个表进行排序