首页 > 解决方案 > 使用线程处理多个文件

问题描述

我有一个文件,我需要使用它来执行 wordcount 函数(基于MapReduce),但是使用线程,我将文件拆分为多个小文件,然后循环小文件以使用Reduce()函数计算单词的出现次数,我怎样才能用函数来实现线程run()以将它们与Reduce函数一起使用。

这是我的代码:

public class WordCounter implements Runnable {

private String Nom;
    protected static int Chunks =  1 ;
    public WordCounter (String n) {
        Nom = n;
    }

   public void split () throws IOException
    {

    File source = new File(this.Nom);
    int maxRows = 100;
    int i = 1;

        try(Scanner sc = new Scanner(source)){
            String line = null;
            int lineNum = 1;

            File splitFile = new File(this.Nom+i+".txt");

            FileWriter myWriter = new FileWriter(splitFile);

            while (sc.hasNextLine()) {
            line = sc.nextLine();

                if(lineNum > maxRows){
                    Chunks++;
                    myWriter.close();
                    lineNum = 1;
                    i++;
                    splitFile = new File(this.Nom+i+".txt");
                    myWriter = new FileWriter(splitFile);
                }

                myWriter.write(line+"\n");
                lineNum++;
            }

            myWriter.close();

        }

}
public void Reduce() throws IOException 
    {

        ArrayList<String> words = new ArrayList<String>();
        ArrayList<Integer> count = new ArrayList<Integer>(); 

            for (int i = 1; i < Chunks; i++) {

            //create the input stream (recevoir le texte)
            FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
            //go through the text with a scanner
            Scanner sc = new Scanner(fin);

            while (sc.hasNext()) {
                //Get the next word
                String nextString = sc.next();

                //Determine if the string exists in words
                if (words.contains(nextString)) {
                    int index = words.indexOf(nextString);

                    count.set(index, count.get(index)+1);

                }
                else {
                    words.add(nextString);
                    count.add(1);
                }
            }
                sc.close();
                fin.close();
            }

            // Creating a File object that represents the disk file. 
            FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
            for (int i = 0; i < words.size(); i++) {
                myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");    
            }
            myWriter.close();

            //delete the small files
            deleteFiles();
    }
      public void deleteFiles()
    {
        File f= new File("");
        for (int i = 1; i <= Chunks; i++) {
            f = new File(this.getNom()+i+".txt");
            f.delete();
        }
    }

}

标签: javamultithreadingfile

解决方案


更好地使用Callable而不是使用Runnable接口,这样您就可以检索数据。

因此,为了修复您的代码,您或多或少可以执行以下操作:

public class WordCounter {
       private static ExecutorService threadPool = Executors.newFixedThreadPool(5);  // 5 represents the number of concurrent threads.

       public Map<String, Integer> count(String filename) {
          int chunks = splitFileInChunks(filename);
          List<Future<Report>> reports = new ArrayList<Future<Report>>();

          for (int i=1; i<=chunks; i++) {
             Callable<Report> callable = new ReduceCallable(filename + i + ".txt");
             Future<Report> future = threadPool.submit(callable);
             reports.add(future);
          }

          Map<String, Integer> finalMap = new HashMap<>();
          for (Future<Report> future : reports) {
              Map<String, Integer>  map = future.get().getWords();
              for (Map.Entry<String, Integer> entry : map.entrySet()) {
                  int oldCnt = finalMap.get(entry.getKey()) != null ? finalMap.get(entry.getKey()) : 0;
                  finalMap.put(entry.getKey(), entry.getValue() + oldCnt);
              }
          }
          //  return a map with the key being the word and the value the counter for that word
          return finalMap; 
       }

       // this method doesn't need to be run on the separate thread
       private int splitFileInChunks(String filename) throws IOException { .... }
    }

    public class Report {
           Map<String, Integer> words = new HashMap<>();
           // ... getter, setter, constructor etc
    }

    public class ReduceCounter implements Callable<Report> { 
        String filename;
        public ReduceCounter(String filename) { this.filename = filename;}

         public Report call() {
            //  store the values in a Map<String, Integer> since it's easier that way
            Map<String, Integer>  myWordsMap = new HashMap<String, Integer>;
            // here add the logic from your Reduce method, without the for loop iteration
            // you should add logic to read only the file named with the value from "filename" 

            return new Report(myWordsMap);
         }
    }

请注意,您可以跳过Report类并返回Future<Map<String,Integer>>,但我使用 Report 使其更易于理解。

根据用户要求更新Runnable

public class WordCounter {
         public Map<String, Integer> count(String filename) throws InterruptedException {
           int chunks = splitFileInChunks(filename);
           List<ReduceCounter> counters = new ArrayList<>(); 
           List<Thread> reducerThreads = new ArrayList<>(); 

          for (int i=1; i<=chunks; i++) {
             ReduceCounter  rc = new ReduceCounter(filename + i + ".txt");
             Thread t = new Thread(rc); 
             counters.add(rc);
             reducerThreads.add(t);
             t.start();
          }
          // next wait for the threads to finish processing
          for (Thread t : reducerThreads) {
                t.join();
          }
          // now grab the results from each of them
          for (ReduceCounter cnt : counters ) {
               cnt.getWords();
               // next just merge the results here...
          }
}

减速器类应如下所示:

public class ReduceCounter implements Runnable { 
        String filename;
        Map<String, Integer> words = new HashMap();
        public ReduceCounter(String filename) { this.filename = filename;}

         public void run() {
            //  store the values in the "words" map
            // here add the logic from your Reduce method, without the for loop iteration
            // also read, only the file named with the value from "filename" 

         }
        public Map<String, Integer> getWords() {return words;}
    }

推荐阅读