java - Java 并发得到不一致的结果。(带锁和 LongAdder)
问题描述
我正在做这些练习:
编写一个遍历目录树并为每个文件生成一个线程的程序。在线程中,计算文件中的字数,并在不使用锁的情况下更新声明为 public static long count = 0 的共享计数器;多次运行该程序。发生什么了?为什么?
使用锁来修复前面练习的程序。
使用 LongAdder 修复前面练习的程序。
我编写了以下程序,其中
CountWordThread
回答练习 1,CountWordLockThread
回答练习 2,并且CountWordLongAdderThread
回答练习 3。
Java 代码如下:
import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;
public class ThreadedCountWord {
public long count = 0;
LongAdder la = new LongAdder();
public class CountWordThread extends Thread {
private File f;
CountWordThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
count ++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ReentrantLock lock = new ReentrantLock();
public class CountWordLockThread extends Thread {
private File f;
CountWordLockThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
// It's important to wrap your code into a
// try/finally block to ensure unlocking in case
// of exceptions.
lock.lock();
try {
count++;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CountWordLongAdderThread extends Thread {
private File f;
CountWordLongAdderThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
la.increment();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void runThreads(Stream<Path> s) {
// 1. this MAY get inconsistent results
try {
count = 0;
ExecutorService executor = Executors.newCachedThreadPool();
s.forEach(p -> {
CountWordThread t = new CountWordThread(p.toFile());
t.start();
executor.submit(t);
});
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
System.out.printf("(NoLock) count: %d\n", count);
} catch (Exception e) {
e.printStackTrace();
}
}
public void runThreadsWithLock(Stream<Path> s) {
// 2. this SHOULD NOT generate in-consistent results
try {
count = 0;
ExecutorService executor = Executors.newCachedThreadPool();
s.forEach(p -> {
CountWordLockThread t = new CountWordLockThread(p.toFile());
t.start();
executor.submit(t);
});
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
System.out.printf("(Lock) count: %d\n", count);
} catch (Exception e) {
e.printStackTrace();
}
}
public void runThreadsWithLongAdder(Stream<Path> s) {
// 3. this SHOULD NOT generate in-consistent results
try {
count = 0;
ExecutorService executor = Executors.newCachedThreadPool();
s.forEach(p -> {
CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
t.start();
executor.submit(t);
});
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
System.out.printf("(LongAdder) count: %d\n", la.sum());
la.reset();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// run multi times
try {
for (int i = 0; i < 20; i ++) {
Path path = Paths.get(".");
Stream<Path> sp = Files.walk(path);
Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
&& Files.isRegularFile(p)
&& Files.isReadable(p));
ThreadedCountWord tcw = new ThreadedCountWord();
// tcw.runThreads(s); // 1. this MAY get inconsistent results
tcw.runThreadsWithLock(s); // 2. this SHOULD NOT get inconsistent results
// tcw.runThreadsWithLongAdder(s); // 3. this SHOULD NOT get inconsistent results
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
几乎每次运行 2 或 3 时,我都会得到不一致的答案。我不知道为什么。
一个示例结果将是这样的:
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35815 <-- note this
(Lock) count: 35862
(Lock) count: 35862
对于练习 2,以及
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35826 <-- note this
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
对于练习 3。
你能帮我吗?
更新
在@chrylis 的帮助下,我使用以下代码更新了我的答案,该代码按预期运行:(上面的代码得到错误答案的原因正是@Ivan 所说的。
import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;
public class ThreadedCountWord {
public long count = 0;
LongAdder la = new LongAdder();
public class CountWordThread extends Thread {
private File f;
CountWordThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
count ++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ReentrantLock lock = new ReentrantLock();
public class CountWordLockThread extends Thread {
private File f;
CountWordLockThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
// It's important to wrap your code into a
// try/finally block to ensure unlocking in case
// of exceptions.
lock.lock();
try {
count++;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CountWordLongAdderThread extends Thread {
private File f;
CountWordLongAdderThread(File f) {
this.f = f;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
String pattern = "(\\w+)";
Pattern r = Pattern.compile(pattern);
while ((line = br.readLine()) != null) {
Matcher m = r.matcher(line);
while(m.find()) {
la.increment();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void runThreads(Stream<Path> s) {
// this MAY get inconsistent results
try {
count = 0;
ArrayList<Thread> ts = new ArrayList<>();
s.forEach(p -> {
CountWordThread t = new CountWordThread(p.toFile());
t.start();
ts.add(t);
});
ts.stream().forEach(t -> {
try {
t.join();
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.printf("(NoLock) count: %d\n", count);
} catch (Exception e) {
e.printStackTrace();
}
}
public void runThreadsWithLock(Stream<Path> s) {
// this SHOULD NOT generate in-consistent results
try {
count = 0;
ArrayList<Thread> ts = new ArrayList<>();
s.forEach(p -> {
CountWordLockThread t = new CountWordLockThread(p.toFile());
t.start();
ts.add(t);
});
ts.stream().forEach(t -> {
try {
t.join();
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.printf("(Lock) count: %d\n", count);
} catch (Exception e) {
e.printStackTrace();
}
}
public void runThreadsWithLongAdder(Stream<Path> s) {
// this SHOULD NOT generate in-consistent results
try {
count = 0;
ArrayList<Thread> ts = new ArrayList<>();
s.forEach(p -> {
CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
t.start();
ts.add(t);
});
ts.stream().forEach(t -> {
try {
t.join();
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.printf("(LongAdder) count: %d\n", la.sum());
la.reset();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// run multi times
try {
for (int i = 0; i < 20; i ++) {
Path path = Paths.get(".");
Stream<Path> sp = Files.walk(path);
Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
&& Files.isRegularFile(p)
&& Files.isReadable(p));
ThreadedCountWord tcw = new ThreadedCountWord();
// tcw.runThreads(s); // this MAY get inconsistent results
// tcw.runThreadsWithLock(s); // this SHOULD NOT get inconsistent results
tcw.runThreadsWithLongAdder(s); // this SHOULD NOT get inconsistent results
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
解决方案
你开始你的任务两次:第一次是t.start()
提交给执行者,第二次是提交给执行者。而且因为您没有调用t.join()
aftert.start()
等待任务完成,您可能会因为您在所有工作完成之前打印值而得到不一致的结果
推荐阅读
- python - 使用弹性搜索在 URL 中搜索子字符串。处理少量文件但处理大量文件失败
- sql-server - SQL Server 动态 Pivot 为每个类别返回一行
- sed - SED - 使用 $ 在行首而不是行尾插入字符串
- python - 有没有办法在 Python matplotlib 中以交互方式一一标记图像?
- ruby - 在 Ruby 中,在 class << self 中定义的方法中,为什么父类上定义的常量不能在没有 self 的情况下访问?
- json - IndexError:循环嵌套字典时列表索引超出范围
- javascript - 当多个元素之一退出视口时保持按钮隐藏
- python - 在 Python 中运行 Screen 后台程序的最佳方法是什么?
- javascript - 如何在用作其他网站 iframe 的小部件中使用 Google Analytics
- plaid - 无效的环境:“格子呢”使用开发模式时