首页 > 解决方案 > 停止生产者/消费者循环的惯用方法?

问题描述

为了获得一些良好的并发编程实践,我正在尝试使用 Clojure 的 core.async 库来实现生产者/消费者模式。一切运行良好,但我希望能够在某个时间点同时停止生产者和消费者。

我当前的代码看起来像这样......

(def c (a/chan 5))
(def alive (atom true))

(def producer (a/go-loop []
                (Thread/sleep 1000)
                (when @alive 
                  (a/>! c 1)
                  (recur))))

(def consumer (a/go-loop []
                (Thread/sleep 3000)
                (when @alive 
                  (println (a/<! c))
                  (recur))))
(do
  (reset! alive false)
  (a/<!! producer)
  (a/<!! consumer))

不幸的是,“do”块似乎偶尔会无限期地阻塞。我本质上希望能够阻止两个循环继续并阻塞,直到两个循环都退出。线程/睡眠代码用于模拟执行某些工作单元。

我怀疑停止生产者会导致消费者停车,因此挂起,尽管我不确定另一种方法,有什么想法吗?

标签: asynchronousclojurecore.async

解决方案


有关详细信息,请参阅 ClojureDocs。例子:

(let [c (chan 2) ]
  (>!! c 1)
  (>!! c 2)
  (close! c)
  (println (<!! c)) ; 1
  (println (<!! c)) ; 2
  ;; since we closed the channel this will return false(we can no longer add values)
  (>!! c 1))

对于您的问题,例如:

  (let [c        (a/chan 5)
        producer (a/go-loop [cnt 0]
                   (Thread/sleep 1000)
                   (let [put-result (a/>! c cnt)]
                     (println "put: " cnt put-result)
                     (when put-result
                       (recur (inc cnt)))))

        consumer (a/go-loop []
                   (Thread/sleep 3000)
                   (let [result (a/<! c)]
                     (when result
                       (println "take: " result)
                       (recur))))]
    (Thread/sleep 5000)
    (println "closing chan...")
    (a/close! c))

结果

put:  0 true
put:  1 true
take:  0
put:  2 true
put:  3 true
closing chan...
put:  4 false
take:  1
take:  2
take:  3

推荐阅读