首页 > 解决方案 > 如何设计:具有多线程信号的单个阻塞工作者

问题描述

我有一个工人,它在运行时会收集所有可用的工作并对其进行批处理。由于它不应该轮询可用的工作,它必须在某个地方阻塞,然后运行并返回阻塞。

同时,我有一些向系统添加工作的操作,有时比工作人员可以处理的要快得多。工作不会保存在内存中,因此不可能有工作队列。因此,每个操作都应该向工人发出至少有一些工作可用的信号。

1 Worker                N Producers
  ___                       ____
 /   \                     /    \
|    WAIT <------------ SIGNAL   |
|     |                   |      |
^     V                   ^      V
|     |                   |      |
|    WORK <- DATABASE <- ADD     |
 \___/                     \____/

我还没有找到用标准工具来表达这一点的方法,我需要它至少可以在 C#、Swift 和 Typescript 中实现。我检查的所有并发 API 都必须平衡,因此不允许有比等待更多的信号。类似地,我不想使用队列,因为它可能会变得太大并且不能很好地解决问题。

问题

在不深入详细代码的情况下,我应该使用什么类型、类、方法或算法来实现这个场景?

有什么方法可以结合现有的并发 api,如锁、信号量、原子类型来实现我的目标?

为什么没有一个信号量可以发出比等待更多次的信号(比如等待会清除所有额外的信号)?

标签: c#swiftmultithreadingconcurrencyproducer-consumer

解决方案


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class consumeprodue<BlockingQue> {
    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
    public static void main(String[] args) {
        Thread p = new Thread() {
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println(getName() + " produced :" + i);
                        queue.put(i);
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }

        };
        Thread c = new Thread() {

            public void run() {

                try {
                    while (true) {
                        System.out.println(getName() + "consume :" + queue.take());
                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
            }

        };
        Thread d = new Thread() {

            public void run() {

                try {
                    while (true) {
                        System.out.println(getName() + "consume------- :" + queue.take());
                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
            }

        };
        d.start();
        p.start();
        c.start();

    }

}

推荐阅读