kotlin - 如何更改我的辅助函数以便收集并行处理任务的结果
问题描述
我编写了这个辅助函数,这样我就可以轻松地并行处理一个列表,并且只有在所有工作完成后才继续执行代码。当您不需要返回结果时,它可以很好地工作。
(我知道每次都创建新池并不是最佳实践,它可以很容易地移出,但我想让示例保持简单。)
fun recursiveAction(action: () -> Unit): RecursiveAction {
return object : RecursiveAction() {
override fun compute() {
action()
}
}
}
fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
ForkJoinPool(parallelSize).invoke(recursiveAction {
this.parallelStream().forEach { action(it) }
})
}
示例使用:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
当您想将结果收集回列表时,有什么方法可以制作类似的辅助构造?
我知道我必须使用 RecursiveTask 而不是 RecursiveAction,但我无法像上面那样编写一个辅助函数来包装它。
我想这样使用它:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
或者,有没有更简单的方法可以一起完成这件事?
解决方案
fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
return object : RecursiveTask<T>() {
override fun compute(): T {
return action()
}
}
}
fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
val pool = ForkJoinPool(parallelSize)
val result = mutableListOf<ForkJoinTask<E>>()
for (item in this) {
result.add(pool.submit(recursiveTask {
action(item)
}))
}
return result.map { it.join() }
}
fun main(args: Array<String>) {
val list = listOf(1, 2, 3)
list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}
推荐阅读
- javascript - 无法使用 IntlProvider 浅层渲染组件
- go - 有没有办法在 macOS 中方便地切换 go 版本?
- php - 如何在 laravel 查询中转换 SQL 查询?
- python - 将数据框拆分为子数据框并根据相关数据框填充内容?
- ansible - 在哪里放置全局自定义 ansible 过滤器
- spring-boot - Java Spring Boot 和 React Native 应用程序同时向 api 发送 id 和 object
- apache-kafka - 从 Angular 前端发布到 Apache Kafka 主题
- webots - 如何实现在 Webots Web Simulation 中编辑控制器的功能?
- docker - 无法按照 Docker 教程构建 docker 映像
- angular - Angular 应用程序和部署在 Docker 容器中的 Spring Boot API 之间的通信问题