java - 即使使用线程安全实现,如何修复异步 java 代码失败?
问题描述
我正在开发一个实现与此类似的代码库。当 count 的值增加时,我们遇到了一个线程无法与其他线程同步的问题,从而进入无限循环。
问题似乎来自后增量运算符的非原子行为。
您可以在此处找到代码Repl注意:您可能需要运行代码至少 3 次才能观察它。
我需要支持以线程安全的方式通过尽可能多的线程实现计数增量。
class Main {
static volatile Integer count = new Integer(0); //boxed integer is intentional to demonstrate mutable instance
static final void Log(Object o) {
System.out.println(o);
}
static synchronized void increaseCount(){
count++;
}
static synchronized Integer getCount(){
return count;
}
public static void main(String[] arg) throws InterruptedException {
new Thread(() -> {
while (getCount() != 60) {
increaseCount();
Log(count +" thread A");
}
}).start();
new Thread(() -> {
while (getCount() != 20) {
increaseCount();
Log(count +" thread B");
}
}).start();
new Thread(() -> {
while (getCount() != 50) {
increaseCount();
Log(count+" thread C");
}
}).start();
}
}
解决方案
如果许多线程正在递增一个共享计数器,则无法保证哪个线程会看到计数器的特定值。为了确保特定线程看到特定值,该线程必须看到计数器的每个值。然后你也可以只拥有一个线程,因为它们都彼此同步工作。
如果您想为计数器的每个值做一些工作,并对特定值进行特殊处理,并且您希望并行化该工作负载,则每个线程都需要准备好执行特殊处理。这是一个如何做到这一点的示例:
class Main {
private static class Worker implements Runnable {
private final AtomicInteger counter;
private final Set<Integer> triggers;
Worker(AtomicInteger counter, Set<Integer> triggers) {
this.counter = counter;
this.triggers = triggers;
}
public void run() {
String name = Thread.currentThread().getName();
while (!triggers.isEmpty()) {
int value = counter.getAndIncrement();
try { /* Simulate actually doing some work by sleeping a bit. */
long delay = (long) (-100 * Math.log(1 - ThreadLocalRandom.current().nextDouble()));
TimeUnit.MILLISECONDS.sleep(delay);
} catch (InterruptedException ex) {
break;
}
boolean triggered = triggers.remove(value);
if (triggered) {
System.out.println(name + " handled " + value);
} else {
System.out.println(name + " skipped " + value);
}
}
}
}
public static void main(String[] arg) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Set<Integer> triggers = new ConcurrentSkipListSet<>();
triggers.add(60);
triggers.add(20);
triggers.add(50);
int concurrency = 4;
ExecutorService workers = Executors.newFixedThreadPool(concurrency);
for (int i = 0; i < concurrency; ++i) {
workers.execute(new Worker(counter, triggers));
}
workers.shutdown();
}
}
可以调整工作线程的数量,以便考虑到机器上的内核数量和实际工作负载(任务的 CPU 或 I/O 密集程度),它是有意义的。
在这种方法中,计数器的每个值仅由一个线程处理,哪个线程获得“哨兵”值并不重要。但是,当所有的标记值都被处理后,所有的线程都关闭了。counter
线程通过, 和一组“触发器”或它们需要处理的标记值相互协调。
推荐阅读
- python - 有没有办法像基于图像的示例一样调整 CNN 的非图像文件的大小?
- javascript - Fancybox 忽略拇指选项
- c# - 点网的 Splunk 配置
- sqlite - 未触发 TypeORM 挂钩,包括最小项目
- java - 在 VSCode 中编译 java 类
- javascript - 跳出 if 语句
- python - 在python中用于网络抓取的for循环
- wordpress - 我可以使用 add_action 或 add_filter 在 Wordpress 管理表视图 (WP_List_Table) 中显示是或否而不是 0 或 1?
- python - 在创建标签的循环中将元组转换为文本
- python - Python SQL:读取和执行 SQL 文件时出错