clojure - 如何在 Clojure 中实现并行逻辑或提前终止
问题描述
我想定义一个谓词,将一些带有相应输入的谓词(它们可以作为惰性调用序列给出)作为输入,并行运行它们并计算结果的逻辑或,以这样的方式,当谓词调用终止返回true
时,整个计算也终止(返回true
)。
除了提供时间优化之外,这还有助于避免在某些情况下不终止(某些谓词调用可能不会终止)。实际上,将非终止解释为第三个undefined
值,这个谓词模拟了Kleene 的 K3 逻辑中的 or 操作
(初始中心Kleene 代数中的连接)。
这里为 Haskell 家族提供了类似的东西。在 Clojure 中是否有任何(最好是简单的)方法可以做到这一点?
编辑:我决定在阅读评论后添加一些说明。
(a) 首先,线程池耗尽后发生的事情并不重要。我认为为我们的需要创建一个足够大的线程池是一个合理的约定。
(b) 最关键的要求是谓词调用开始并行运行,一旦谓词调用终止返回true
,所有其他运行的线程都会被中断。预期的行为是:
- 如果有谓词调用返回
true
:并行或返回true
- else 如果有一个谓词调用不终止:并行或不终止
- else:并行或返回
false
换句话说,它的行为类似于false
< undefined
<给出的 3 元素格中的连接true
,undefined
表示非终止。
(c) 并行的 or 应该能够将许多谓词和许多谓词输入(每个都对应一个谓词)作为输入。但如果将惰性序列作为输入会更好。然后,命名并行或pany
(对于“并行任何”),我们可以进行如下调用:
(pany (map (comp eval list) predicates inputs))
(pany (map (comp eval list) predicates (repeat input)))
(pany (map (comp eval list) (repeat predicate) inputs))
这相当于(pany (map predicate (unchunk inputs)))
作为最后的评论,我认为要求诸如pany
对偶pall
或构建这种早期终止并行缩减的机制是很自然的,以便易于实现甚至内置在像 Clojure 这样的面向并行的语言中。
解决方案
我将根据归约函数来定义我们的谓词。实际上,我们可以重新实现所有 Clojure 迭代函数来支持这种并行操作,但我将仅使用 reduce 作为示例。
我将定义一个计算函数。我会用同一个,但没有什么能阻止你拥有很多。如果累积 1000,则该函数为“真”。
(defn computor [acc val]
(let [new (+' acc val)] (if (> new 1000) (reduced new) new)))
(reduce computor 0 (range))
;; =>
1035
(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation
;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)]
[computor 0 (range Long/MIN_VALUE 0)]])
现在让我们来看看这个。我想在每次计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,一次一步使用 pmap 非常慢 - 工作单元太小,不值得线程化。在这里,在继续之前,我已经更改了每个工作单元的 1000 次迭代。您可能会根据您的工作量和一个步骤的成本来调整它。
(defn p-or-reducer* [reductions]
(let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
然后我将其包装在驱动程序中。
(defn p-or [s]
(p-or-reducer* (map #(apply reductions %) s)))
(p-or predicates)
;; =>
1035
在哪里插入 CPU 并行度?p-or-reducer* 中的 s/map/pmap/ 应该这样做。我建议只并行化第一个操作,因为这将驱动减少序列进行计算。
(defn p-or-reducer* [reductions]
(let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
[computor 0 (range)]))
(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not
定义一个高性能的通用版本是非常困难的。在不知道每次迭代的成本的情况下,很难得出有效的并行策略——如果一次迭代需要 10 秒,那么我们可能一次只走一步。如果需要 100ns,那么我们需要一次采取许多步骤。
推荐阅读
- php - Ajax 表单不上传文件或发送电子邮件
- c# - 最好检查 null 并避免 Null Ref Exception 或简单地捕获 Null Ref Exception
- python - PathLike 对象中的 __fspath__() 函数
- node.js - 如何在adonis js中的控制器之间共享代码
- r - 使用 R 创建计数器
- php - PHP 和网络 API
- javascript - 如何从对象数组创建反应列表组件?
- snmp - 每次对 MIB 安装文件进行更改时是否需要运行 ./configure?
- java - How to create a second/millisecond timer like phone?
- java - Z3 Java 应用程序开发