首页 > 解决方案 > 如何更改我的辅助函数以便收集并行处理任务的结果

问题描述

我编写了这个辅助函数,这样我就可以轻松地并行处理一个列表,并且只有在所有工作完成后才继续执行代码。当您不需要返回结果时,它可以很好地工作。

(我知道每次都创建新池并不是最佳实践,它可以很容易地移出,但我想让示例保持简单。)

fun recursiveAction(action: () -> Unit): RecursiveAction {
    return object : RecursiveAction() {
        override fun compute() {
            action()
        }
    }
}

fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
    ForkJoinPool(parallelSize).invoke(recursiveAction {
        this.parallelStream().forEach { action(it) }
    })
}

示例使用:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

当您想将结果收集回列表时,有什么方法可以制作类似的辅助构造?

我知道我必须使用 RecursiveTask 而不是 RecursiveAction,但我无法像上面那样编写一个辅助函数来包装它。

我想这样使用它:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

或者,有没有更简单的方法可以一起完成这件事?

标签: kotlinparallel-processingforkjoinpool

解决方案


JeffMurdockReddit上回答

fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
    return object : RecursiveTask<T>() {
        override fun compute(): T {
            return action()
        }
    }
}

fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
    val pool = ForkJoinPool(parallelSize)
    val result = mutableListOf<ForkJoinTask<E>>()
    for (item in this) {
        result.add(pool.submit(recursiveTask {
            action(item)
        }))
    }
    return result.map { it.join() }
}

fun main(args: Array<String>) {
    val list = listOf(1, 2, 3)
    list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}

推荐阅读