首页 > 解决方案 > 每个运行一个无限循环的异步线程

问题描述

我正在实现一个包含不同任务并且都实现了 Runnable 的程序。例如,有一个任务在数据库上工作并将一些元组发送到同步的共享内存,随后,有另一个线程检查共享内存并将消息发送到队列。此外,这两个线程迭代了一个无限的while循环。

我已经使用 fixedThreadPool 来执行这些线程。

问题是有时程序控制保留在第一个运行线程中,而第二个线程永远没有机会进入其运行状态。

这是一个与我类似的示例代码:

public class A implements Runnable {
    @Override
    public void run() {
        while(true) {
        //do something
        }
    }
}
public class B implements Runnable {
    @Override
    public void run() {
        while(true) {
        //do something
        }
    }
}
public class Driver {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        A a = new A();
        executorService.execute(a);
        B b = new B();
        executorService.execute(b);
    }
}

我还做了一些棘手的事情,让第一个线程在短时间运行后休眠一次。结果,它使第二个线程找到运行的机会。但是这个问题有什么好的解决方案吗?你认为问题出在哪里?

标签: javamultithreadingthreadpool

解决方案


这是生产者/消费者模式的一个很好的例子。有很多方法可以实现这一点。这是一个使用wait/notify模式的简单实现。

public class A implements Runnable {
    private Queue<Integer> queue;
    private int maxSize;

    public A(Queue<Integer> queue, int maxSize) {
        super();
        this.queue = queue;
        this.maxSize = maxSize;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.size() == maxSize) {
                    try {
                        System.out.println("Queue is full, " + "Producer thread waiting for "
                                + "consumer to take something from queue");
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                Random random = new Random();
                int i = random.nextInt();
                System.out.println("Producing value : " + i);
                queue.add(i);
                queue.notifyAll();
            }

        }
    }

}

public class B implements Runnable {
    private Queue<Integer> queue;

    public B(Queue<Integer> queue) {
        super();
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    System.out.println("Queue is empty," + "Consumer thread is waiting"
                            + " for producer thread to put something in queue");
                    try {
                        queue.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }

                }
                System.out.println("Consuming value : " + queue.remove());
                queue.notifyAll();
            }

        }

    }

}

这里很热,我们设置了一些东西。

public class ProducerConsumerTest {

    public static void main(String[] args) {
        Queue<Integer> buffer = new LinkedList<>();
        int maxSize = 10;

        Thread producer = new Thread(new A(buffer, maxSize));
        Thread consumer = new Thread(new B(buffer));

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(producer);
        executorService.submit(consumer);
    }

}

在这种情况下,Queue它充当共享内存。您可以将其替换为适合您需要的任何其他数据结构。这里的诀窍是您必须仔细协调线程之间的关系。这就是您上面的实现所缺乏的。


推荐阅读