首页 > 解决方案 > 如何在 Clojure 中实现并行逻辑或提前终止

问题描述

我想定义一个谓词,将一些带有相应输入的谓词(它们可以作为惰性调用序列给出)作为输入,并行运行它们并计算结果的逻辑或,以这样的方式,当谓词调用终止返回true时,整个计算也终止(返回true)。

除了提供时间优化之外,这还有助于避免在某些情况下不终止(某些谓词调用可能不会终止)。实际上,将非终止解释为第三个undefined值,这个谓词模拟了Kleene 的 K3 逻辑中的 or 操作 (初始中心Kleene 代数中的连接)。

这里为 Haskell 家族提供了类似的东西。在 Clojure 中是否有任何(最好是简单的)方法可以做到这一点?

编辑:我决定在阅读评论后添加一些说明。

(a) 首先,线程池耗尽后发生的事情并不重要。我认为为我们的需要创建一个足够大的线程池是一个合理的约定。

(b) 最关键的要求是谓词调用开始并行运行,一旦谓词调用终止返回true,所有其他运行的线程都会被中断。预期的行为是:

换句话说,它的行为类似于false< undefined<给出的 3 元素格中的连接trueundefined表示非终止。

(c) 并行的 or 应该能够将许多谓词和许多谓词输入(每个都对应一个谓词)作为输入。但如果将惰性序列作为输入会更好。然后,命名并行或pany(对于“并行任何”),我们可以进行如下调用:

作为最后的评论,我认为要求诸如pany对偶pall或构建这种早期终止并行缩减的机制是很自然的,以便易于实现甚至内置在像 Clojure 这样的面向并行的语言中。

标签: clojureparallel-processingfunctional-programminglogicterminate

解决方案


我将根据归约函数来定义我们的谓词。实际上,我们可以重新实现所有 Clojure 迭代函数来支持这种并行操作,但我将仅使用 reduce 作为示例。

我将定义一个计算函数。我会用同一个,但没有什么能阻止你拥有很多。如果累积 1000,则该函数为“真”。

(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))

(reduce computor 0 (range))
;; =>
1035

(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation

;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

现在让我们来看看这个。我想在每次计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,一次一步使用 pmap 非常慢 - 工作单元太小,不值得线程化。在这里,在继续之前,我已经更改了每个工作单元的 1000 次迭代。您可能会根据您的工作量和一个步骤的成本来调整它。

(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

然后我将其包装在驱动程序中。

(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))

(p-or predicates)
;; =>
1035

在哪里插入 CPU 并行度?p-or-reducer* 中的 s/map/pmap/ 应该这样做。我建议只并行化第一个操作,因为这将驱动减少序列进行计算。

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))

(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

定义一个高性能的通用版本是非常困难的。在不知道每次迭代的成本的情况下,很难得出有效的并行策略——如果一次迭代需要 10 秒,那么我们可能一次只走一步。如果需要 100ns,那么我们需要一次采取许多步骤。


推荐阅读