首页 > 解决方案 > Java 并发得到不一致的结果。(带锁和 LongAdder)

问题描述

我正在做这些练习:

  1. 编写一个遍历目录树并为每个文件生成一个线程的程序。在线程中,计算文件中的字数,并在不使用锁的情况下更新声明为 public static long count = 0 的共享计数器;多次运行该程序。发生什么了?为什么?

  2. 使用锁来修复前面练习的程序。

  3. 使用 LongAdder 修复前面练习的程序。

我编写了以下程序,其中

  1. CountWordThread回答练习 1,
  2. CountWordLockThread回答练习 2,并且
  3. CountWordLongAdderThread回答练习 3。

Java 代码如下:

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {


    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // 1. this MAY get inconsistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // 2. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // 3. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // 1. this MAY get inconsistent results
                tcw.runThreadsWithLock(s); // 2. this SHOULD NOT get inconsistent results
                // tcw.runThreadsWithLongAdder(s); // 3. this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

几乎每次运行 2 或 3 时,我都会得到不一致的答案。我不知道为什么。

一个示例结果将是这样的:

(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35815 <-- note this
(Lock) count: 35862
(Lock) count: 35862

对于练习 2,以及

(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35826 <-- note this
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862

对于练习 3。

你能帮我吗?

更新

在@chrylis 的帮助下,我使用以下代码更新了我的答案,该代码按预期运行:(上面的代码得到错误答案的原因正是@Ivan 所说的。

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {

    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // this MAY get inconsistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                });
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // this MAY get inconsistent results
                // tcw.runThreadsWithLock(s); // this SHOULD NOT get inconsistent results
                tcw.runThreadsWithLongAdder(s); // this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

标签: javaconcurrencyjava.util.concurrent

解决方案


你开始你的任务两次:第一次是t.start()提交给执行者,第二次是提交给执行者。而且因为您没有调用t.join()aftert.start()等待任务完成,您可能会因为您在所有工作完成之前打印值而得到不一致的结果


推荐阅读