首页 > 解决方案 > 如何在 Common Lisp 中同时运行多个阻塞循环。[将 cl-async 与线程池中的队列相结合]

问题描述

我是编程新手,也是 Common Lisp 的新手。我正在尝试解决以下问题:

有流源(S1、S2、S3)、两个处理器(P1、P2)(处理器我不是指 CPU 处理器,而是处理功能/子系统)和四个流目的地(D1、D2、D3、 D4)。通过流,我的意思是数据连续出现,但可能是间歇性的。所以读/写操作可能会阻塞。来自流的数据需要聚合,然后根据它们是什么,由其中一个处理器处理。每个处理器(从其输入聚合)产生四种子聚合,然后将它们发送到上述四个目的地之一。

我有这个问题的伪代码是这样的:

三个循环,S1、S2、S3 各一个,用于数据聚合和调度。对于特定的源(比如 S2)伪代码是:

 loop-2 : read from S2,
            when sufficient data aggregate, 
                if aggregate is kind-1 send to P1 
                  else send to P2.

两个循环,一个用于 P1、P2,用于处理和解聚合。对于特定的处理器(比如 P1)伪代码是:

In a loop for P1:   take aggregate of kind-1,
                       process it to produce one or some of four sub-aggregates.
                          case: sub-aggregate 1 -> serialize and send/write to D1,
                                sub-aggregate 2 -> serialize and send/write to D2,
                                sub-aggregate 3 -> serialize and send/write to D3,
                                sub-aggregate 4 -> serialize and send/write to D4.

在处理器和目的地之间,我想使用另一个通道/队列,但没有提到让我的问题对自己来说更简单。

据我了解,大约有十个阻塞点。三,如果数据源中不可用(S1、S2、S3);当处理器(P1,P2)的数据不足时,三个,当目标(D1,D2,D3,D4)没有准备好写入时,四个。

所以我的问题是:如何同时运行所有这些循环(同时是这个词?)而不会相互阻塞;以及程序中正在做的其他事情。

先前的工作:我已经查看了这个SO 问题,来自cl-async 的轮询器通知器。查看Cliki-Concurrency,我还看到了threading-queue。更不用说cl-flowgreen-threads 了。CL-flow 提到它是“Common Lisp 中异步非阻塞并发的库”。有没有办法,我可以使用带有 cl-async 或 cl-flow 的队列库?例如,flow-concurrently似乎是正确的。但是它可以与 cl-async 的绿色线程一起使用来运行非终止循环吗?

我的难点:我试图阅读我上面提到的每个链接并尽可能多地理解。但是,老实说,我什至无法弄清楚从哪里/如何开始。谢谢你的帮助。

编辑:2020 年 2 月 28 日。

我尝试像这样使用 cl-flow。初始化流系统后


(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")

(use-package :cl-flow :bt-semaphore)

(defvar *output* *standard-output*)

(defun print-error (e)
  (format *output* "~A" e))

(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
                      :threads 4
                      :error-handler #'print-error))

(defun run-flow (flow)
  (run *dispatcher* flow))

我使用了以下功能:


(defun hello-world (n)
  (sleep n)
  (format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))


(defun gap(n)
  (if (= n 0)
      ""
      (concatenate 'string "         " (gap (- n 1)))))

(defun loop-hello (n)
  (loop
     (hello-world n)))

(defun test-flow ()
  (run-flow 
     (flow:concurrently
         (flow:asynchronously
          (loop-hello 3))
         (flow:asynchronously
          (loop-hello 2))
         (flow:asynchronously
         (loop-hello 1)))))

hello-world 是一个简单的函数,我加入了函数 gap 来在文本前插入一些空格。正如所言,我期待三列交错的打印行(同时,异步等),但我只得到一个(第一个循环-hello)列。

我目前的理解如下,请指正:循环和睡眠都是同步语句,所以执行被阻塞。所以我搜索了循环和睡眠的异步版本。我想知道我是否可以使用 cl-async 来做到这一点。

与此同时,我试图简化我的问题。昨天,我也偶然发现了这个问题及其答案。我试图理解它。

此外,由于我使用伪代码发布了我的原始问题,我想知道我希望(比如说)一个库为我提供功能(即使它现在不存在)。

我尝试在 google 中搜索异步代码,大多数结果显示 javascript 库。到目前为止,我只能模糊地掌握两个。Python,Clojure(Java)。写一个从 q1 异步读取的小函数,处理读取 v1 到 v2 的值,然后写入 q2(q1 和 q2 是队列)。对我来说,似乎我可以这样写(说):

async def p1(x):
    #compute v2 from v1 and return v2

async def keep_doing_p1(q1, q2):
    while True:
        v1 = await q1.pop()
        v2 = p1(v1) # function p1 defined above
        await q2.push(v2)

当然,必须有合适的 q1 和 q2 实现。在 Clojure(Java) 中,适当地使用 core.async 库:

(def q1 (async/chan))
(def q2 (async/chan))

(defn p1 [v1]
  ;;; Need a transducer version
  ;;; compute v2 from v1 and return)

(async/go
   (-->
       (async/< q1)
       (p1)
       (async/q2)))

我上面发布的代码可能是错误的。我是根据我(很可能)肤浅的理解写的。我没有 Python、Clojure(Java) 经验。这些例子也不是煽动任何火焰战争。我认为所有语言都可以很好、强大、方便,并迎合不同的人类程序员。我想解决 CL 中的问题。我是一个资深的人(按年龄),但一个初级的,实际上是一个新手(在编程方面)。我真的很喜欢用 CL 写它。并不是说我已经或即将成为一个伟大的程序员。但是有了CL,我觉得,天啊,我连程序都可以写了!

不管怎样,所以回到问题上来。所以我想要一个库来提供如下语法/功能:

(cc-forever q1 q2 p1
  (push q2 (p1 (pop q1))))

;;; many other such cc-forever, operating concurrently

你可能已经猜到了,cc-forever 的意思是(ConCurrently-run FOREVER)。

所以我正在寻找制作这样一个函数/库的帮助。如果我的思维过程仍然很复杂,请提出更好/更简单的替代方案。谢谢。

稍后将再次发布我的尝试。

编辑:2020 年 5 月 27 日

我将继续探索 cl-async 和 cl-flow。我最近遇到了在 cl-async 之上实现的 with-green-thread 宏。作为一个新手,我面临着双重问题:这些库使用不同的语法,并且可能使用它在语义上做不同的事情。目前,我对 with-green-thread 和 cl-flow: 反复感到鼓舞。尽管有时代码会引发错误(可能是我对某些 quicklisp 问题或其他问题的处理不当)。会继续发帖。如果有任何反馈,将很有帮助。

标签: multithreadingconcurrencycommon-lispevent-loopgreen-threads

解决方案


推荐阅读