java - 使用线程处理多个文件
问题描述
我有一个文件,我需要使用它来执行 wordcount 函数(基于MapReduce
),但是使用线程,我将文件拆分为多个小文件,然后循环小文件以使用Reduce()
函数计算单词的出现次数,我怎样才能用函数来实现线程run()
以将它们与Reduce函数一起使用。
这是我的代码:
public class WordCounter implements Runnable {
private String Nom;
protected static int Chunks = 1 ;
public WordCounter (String n) {
Nom = n;
}
public void split () throws IOException
{
File source = new File(this.Nom);
int maxRows = 100;
int i = 1;
try(Scanner sc = new Scanner(source)){
String line = null;
int lineNum = 1;
File splitFile = new File(this.Nom+i+".txt");
FileWriter myWriter = new FileWriter(splitFile);
while (sc.hasNextLine()) {
line = sc.nextLine();
if(lineNum > maxRows){
Chunks++;
myWriter.close();
lineNum = 1;
i++;
splitFile = new File(this.Nom+i+".txt");
myWriter = new FileWriter(splitFile);
}
myWriter.write(line+"\n");
lineNum++;
}
myWriter.close();
}
}
public void Reduce() throws IOException
{
ArrayList<String> words = new ArrayList<String>();
ArrayList<Integer> count = new ArrayList<Integer>();
for (int i = 1; i < Chunks; i++) {
//create the input stream (recevoir le texte)
FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
//go through the text with a scanner
Scanner sc = new Scanner(fin);
while (sc.hasNext()) {
//Get the next word
String nextString = sc.next();
//Determine if the string exists in words
if (words.contains(nextString)) {
int index = words.indexOf(nextString);
count.set(index, count.get(index)+1);
}
else {
words.add(nextString);
count.add(1);
}
}
sc.close();
fin.close();
}
// Creating a File object that represents the disk file.
FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
for (int i = 0; i < words.size(); i++) {
myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");
}
myWriter.close();
//delete the small files
deleteFiles();
}
public void deleteFiles()
{
File f= new File("");
for (int i = 1; i <= Chunks; i++) {
f = new File(this.getNom()+i+".txt");
f.delete();
}
}
}
解决方案
更好地使用Callable而不是使用Runnable接口,这样您就可以检索数据。
因此,为了修复您的代码,您或多或少可以执行以下操作:
public class WordCounter {
private static ExecutorService threadPool = Executors.newFixedThreadPool(5); // 5 represents the number of concurrent threads.
public Map<String, Integer> count(String filename) {
int chunks = splitFileInChunks(filename);
List<Future<Report>> reports = new ArrayList<Future<Report>>();
for (int i=1; i<=chunks; i++) {
Callable<Report> callable = new ReduceCallable(filename + i + ".txt");
Future<Report> future = threadPool.submit(callable);
reports.add(future);
}
Map<String, Integer> finalMap = new HashMap<>();
for (Future<Report> future : reports) {
Map<String, Integer> map = future.get().getWords();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int oldCnt = finalMap.get(entry.getKey()) != null ? finalMap.get(entry.getKey()) : 0;
finalMap.put(entry.getKey(), entry.getValue() + oldCnt);
}
}
// return a map with the key being the word and the value the counter for that word
return finalMap;
}
// this method doesn't need to be run on the separate thread
private int splitFileInChunks(String filename) throws IOException { .... }
}
public class Report {
Map<String, Integer> words = new HashMap<>();
// ... getter, setter, constructor etc
}
public class ReduceCounter implements Callable<Report> {
String filename;
public ReduceCounter(String filename) { this.filename = filename;}
public Report call() {
// store the values in a Map<String, Integer> since it's easier that way
Map<String, Integer> myWordsMap = new HashMap<String, Integer>;
// here add the logic from your Reduce method, without the for loop iteration
// you should add logic to read only the file named with the value from "filename"
return new Report(myWordsMap);
}
}
请注意,您可以跳过Report类并返回Future<Map<String,Integer>>
,但我使用 Report 使其更易于理解。
根据用户要求更新Runnable
public class WordCounter {
public Map<String, Integer> count(String filename) throws InterruptedException {
int chunks = splitFileInChunks(filename);
List<ReduceCounter> counters = new ArrayList<>();
List<Thread> reducerThreads = new ArrayList<>();
for (int i=1; i<=chunks; i++) {
ReduceCounter rc = new ReduceCounter(filename + i + ".txt");
Thread t = new Thread(rc);
counters.add(rc);
reducerThreads.add(t);
t.start();
}
// next wait for the threads to finish processing
for (Thread t : reducerThreads) {
t.join();
}
// now grab the results from each of them
for (ReduceCounter cnt : counters ) {
cnt.getWords();
// next just merge the results here...
}
}
减速器类应如下所示:
public class ReduceCounter implements Runnable {
String filename;
Map<String, Integer> words = new HashMap();
public ReduceCounter(String filename) { this.filename = filename;}
public void run() {
// store the values in the "words" map
// here add the logic from your Reduce method, without the for loop iteration
// also read, only the file named with the value from "filename"
}
public Map<String, Integer> getWords() {return words;}
}
推荐阅读
- python - Cython:C 类“myclass”已声明但未定义
- mysql - SQL 查询 - 在时间序列的开头添加和更新值
- c# - 为什么一个可以编译而另一个不能?
- botframework - 当团队调用但响应为 501 时,handleTeamsSigninVerifyState 被击中
- c# - 理解 Unity 中的四元数
- bash - Mac Bash shell 如何在单独的窗口中显示编辑器
- ssl - *.herokuapp.com 通配符证书到期
- java - 如何向 JComponent 添加多个矩形?
- javascript - 为什么在 vuejs 中将数据插入表时我的提交按钮不起作用
- javascript - 如何获取没有双引号的 Jade 文件