首页 > 解决方案 > 带有信号量的生产者-消费者陷入僵局

问题描述

我正在尝试仅使用信号量创建生产者-消费者。使用以下代码

public class Application {

    public static int id = 0;

    public static void main(String[] args) {

        Semaphore producerSem = new Semaphore(1);
        Semaphore consumerSem = new Semaphore(1);
        Queue<Integer> line = new LinkedList<>();

        Runnable produce = () -> {
            try {
                producerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " producing");
                while (line.size() > 10) continue;
                Thread.sleep(2000);
                line.offer(id);
                System.out.println(Thread.currentThread().getName() + " produced thing with id: " + id);
                id++;
                System.out.println(Thread.currentThread().getName() + " finished producing");
                producerSem.release();

            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        Runnable consume = () -> {
            try {
                consumerSem.acquire();
                System.out.println(Thread.currentThread().getName() + " consuming");
                while (line.size() < 1) continue;
                int product = line.remove();
                System.out.println(Thread.currentThread().getName() + " consumed thing with id: " + product);
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + " finished consuming");
                consumerSem.release();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 100; i++) {
            new Thread(consume, "Consumer - " + i).start();
            new Thread(produce, "Producer - " + i).start();
        }
    }
}

它陷入僵局,我无法调试它,因为它进入 id 10 并停止,所以看起来一个元素被删除了,但它无法继续前进。任何地方都有断点,它工作正常,即使在任何任务结束时

标签: javamultithreadingproducer-consumer

解决方案


我认为只使用信号量来编写线程安全代码是一个很好的练习。

你应该保护三个方面:

  • id是一个简单的全局整数,一次只能写入一个进程。
  • 向队列添加或删除作业也应该受到保护。
  • 虽然我们可以使用队列保护来检查可用性,但等待新作业的一种优雅方式是使用同步计数器(信号量)。这样,我们就不会仅仅为了检查而获取队列。

修改后的代码可以是:

// our `id` counter...
private static int id = 0;
// ...and their semaphore for thread safety
private static Semaphore idSemaphore = new Semaphore(1);

// jobs queue...
private static Queue<Integer> lineJobs = new LinkedList<>();
// ...and their semaphore for thread safety
private static Semaphore lineSemaphore = new Semaphore(1);

// thread safe counter for (thread safety) jobs availability check
private static Semaphore lineSemaphoreAvailable = new Semaphore(0);

public static void main(String[] args) {


    Runnable produce = () -> {

        // we will set `myId` only once
        final int myId;

        // create a new id
        try {
            // wait to be granted to touch the global (unsafe integer) `id`
            idSemaphore.acquire();
            myId = id++;
        } catch (InterruptedException e) {
            System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
            return;
        } finally {
            // with exception or not release the semaphore
            idSemaphore.release();
        }

        System.out.printf("%s: producing job #%d...%n", Thread.currentThread().getName(), myId);

        // producing job...
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        } catch (InterruptedException e) {
            System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
            return;
        }

        // store the job
        try {
            // wait to be granted to touch the global (unsafe queue) `lineJobs`
            lineSemaphore.acquire();
            lineJobs.add(myId);

            // notify to consumers a new one job is available to peek
            lineSemaphoreAvailable.release();

            System.out.printf("%s: job #%d ready to do!%n", Thread.currentThread().getName(), myId);
        } catch (InterruptedException e) {
            System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
            return;
        } finally {
            lineSemaphore.release();
        }
    };

    Runnable consume = () -> {

        final int myId;

        // get a job
        try {
            // wait for a new job to become available
            lineSemaphoreAvailable.acquire();
            
            // wait to be granted to touch the global (unsafe queue) `lineJobs`
            lineSemaphore.acquire();
            myId = lineJobs.remove();
            System.out.printf("%s: job #%d adquired%n", Thread.currentThread().getName(), myId);
        } catch (InterruptedException e) {
            System.out.printf("%s: INTERRUPTED!%n", Thread.currentThread().getName());
            return;
        } finally {
            lineSemaphore.release();
        }

        System.out.printf("%s: job #%d finished!%n", Thread.currentThread().getName(), myId);
    };

    for (int i = 0; i < 100; i++) {
        new Thread(consume, "Consumer - " + i).start();
        new Thread(produce, "Producer - " + i).start();
    }
}

带输出

Producer - 0: producing job #0...
Producer - 1: producing job #1...
...
Producer - 46: producing job #45...
Producer - 45: producing job #44...
Producer - 9: job #9 ready to do!
Producer - 44: producing job #46...
Producer - 47: producing job #47...
Producer - 48: producing job #48...
Producer - 49: producing job #49...
Consumer - 0: job #9 adquired
Consumer - 0: job #9 finished!
Producer - 14: job #14 ready to do!
Consumer - 1: job #14 adquired
Consumer - 1: job #14 finished!
Producer - 50: producing job #50...
Producer - 51: producing job #51...
...
Producer - 72: producing job #72...
Producer - 39: job #39 ready to do!
Consumer - 2: job #39 adquired
Consumer - 2: job #39 finished!
Producer - 73: producing job #73...
Producer - 74: producing job #74...
Producer - 75: producing job #75...
Producer - 10: job #10 ready to do!
Producer - 40: job #40 ready to do!
Consumer - 3: job #10 adquired
Consumer - 3: job #10 finished!
Producer - 76: producing job #76...
Producer - 77: producing job #77...
...
Producer - 82: producing job #82...
Producer - 83: producing job #83...
Producer - 84: producing job #84...
Consumer - 4: job #40 adquired
Consumer - 4: job #40 finished!
Producer - 85: producing job #85...
Producer - 86: producing job #86...
Producer - 71: job #71 ready to do!
Producer - 87: producing job #87...
Consumer - 5: job #71 adquired
Consumer - 5: job #71 finished!
Producer - 88: producing job #88...
Producer - 92: producing job #92...
Producer - 91: producing job #91...
...
Producer - 96: producing job #95...
Producer - 98: producing job #98...
Producer - 99: producing job #99...
Producer - 84: job #84 ready to do!
Consumer - 6: job #84 adquired
Consumer - 6: job #84 finished!
...
Consumer - 97: job #63 adquired
Consumer - 97: job #63 finished!
Producer - 90: job #90 ready to do!
Consumer - 98: job #90 adquired
Consumer - 98: job #90 finished!
Producer - 87: job #87 ready to do!
Consumer - 99: job #87 adquired
Consumer - 99: job #87 finished!

Process finished with exit code 0

推荐阅读