java - 每个运行一个无限循环的异步线程
问题描述
我正在实现一个包含不同任务并且都实现了 Runnable 的程序。例如,有一个任务在数据库上工作并将一些元组发送到同步的共享内存,随后,有另一个线程检查共享内存并将消息发送到队列。此外,这两个线程迭代了一个无限的while循环。
我已经使用 fixedThreadPool 来执行这些线程。
问题是有时程序控制保留在第一个运行线程中,而第二个线程永远没有机会进入其运行状态。
这是一个与我类似的示例代码:
public class A implements Runnable {
@Override
public void run() {
while(true) {
//do something
}
}
}
public class B implements Runnable {
@Override
public void run() {
while(true) {
//do something
}
}
}
public class Driver {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
A a = new A();
executorService.execute(a);
B b = new B();
executorService.execute(b);
}
}
我还做了一些棘手的事情,让第一个线程在短时间运行后休眠一次。结果,它使第二个线程找到运行的机会。但是这个问题有什么好的解决方案吗?你认为问题出在哪里?
解决方案
这是生产者/消费者模式的一个很好的例子。有很多方法可以实现这一点。这是一个使用wait/notify
模式的简单实现。
public class A implements Runnable {
private Queue<Integer> queue;
private int maxSize;
public A(Queue<Integer> queue, int maxSize) {
super();
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("Queue is full, " + "Producer thread waiting for "
+ "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producing value : " + i);
queue.add(i);
queue.notifyAll();
}
}
}
}
public class B implements Runnable {
private Queue<Integer> queue;
public B(Queue<Integer> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting"
+ " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consuming value : " + queue.remove());
queue.notifyAll();
}
}
}
}
这里很热,我们设置了一些东西。
public class ProducerConsumerTest {
public static void main(String[] args) {
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Thread(new A(buffer, maxSize));
Thread consumer = new Thread(new B(buffer));
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(producer);
executorService.submit(consumer);
}
}
在这种情况下,Queue
它充当共享内存。您可以将其替换为适合您需要的任何其他数据结构。这里的诀窍是您必须仔细协调线程之间的关系。这就是您上面的实现所缺乏的。
推荐阅读
- python - Pandas - 在列中找到某个值后选择行
- influxdb - InfluxDB 2.0 如何找出正在执行的查询数量
- rust - Rust 社区中迭代结构的字段并更改循环中的另一个字段的标准方法是什么?
- python - 可以在 odoo 视图中显示当前的 partner.id
- python - 如何从兄弟类配置 tkinter wiget 中的文本?
- python - Django - 如何根据从模板中选择下拉菜单中选择的选项更改视图
- c++ - constexpr 变量 'params' 必须由常量表达式初始化
- sql - 在 ibm db2 中创建数据库时出错
- flutter - Flutter - onDragStarted 时获取 Draggable 触摸点详细信息
- ios - Azure Devops Pipeline MacOS VM 映像似乎没有任何 iOS 模拟器