java - 地图减少单词的行频
问题描述
我目前正在使用 Java 开发一个 Hadoop 项目。我的目标是制作一个减少每个单词的行频的地图。例如,不输出一个单词在输入文件中被计数的确切次数,而只是计算它出现在多少行中。如果一个单词在一行中出现不止一次,它应该只计算一次,因为我们只是计算它出现的行数。我有一个基本的地图减少工作,我将发布,但我对如何只计算单词的行频而不是完整的字数有点迷茫。任何帮助将不胜感激,非常感谢。
地图字数
public class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tokens = new StringTokenizer(value.toString(), "[_|$#0123456789<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']"); //Dividing String into tokens
while (tokens.hasMoreTokens())
{
wordToken.set(tokens.nextToken());
context.write(wordToken, new IntWritable(1));
}
}
}
减少字数
public class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
{
valueSum += val.get();
}
count.set(valueSum);
context.write(key, count);
}
}
驱动程序代码
public class WordCount {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(WordCount.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
解决方案
在 Hadoop 的 MapReduce 的这个用例中,事情出奇地简单,因为 Hadoop 倾向于逐行读取输入文档,即使FileInputFormat
明确指定了 MR 作业的输入数据格式(这远远超出了您的范围问题,但您可以在此处和此处查看有关 Hadoop 中的地图和文件拆分的信息)。
由于每个映射器实例都会有一行作为其输入,因此您唯一需要担心的是:
1.将文本拆分成单词(在将它们从标点符号、松散空格、将它们全部转换为小写等之后),
2.去掉重复项,只剩下所述行的唯一单词,
3.将每一个独特的单词作为key,1
作为value,经典的WordCount风格。
对于2.您可以使用HashSet
which(如您所料)是一种 Java 数据结构,它只保留唯一元素而忽略重复项,将每个标记加载到它,然后迭代它以写入键值对并将它们发送到减速器实例。
这种类型的应用程序可能看起来像这样(我改变了你在Map
函数中标记文本的方式,因为它似乎没有分割每个单词,而只是在标点符号之间分割):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.*;
public class LineFreq
{
public static class MapWordCount extends Mapper <LongWritable, Text, Text, IntWritable>
{
private Text wordToken = new Text();
private static final IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
// dividing String into tokens
String[] tokens = value.toString()
.replaceAll("\\d+", "") // get rid of numbers...
.replaceAll("[^a-zA-Z ]", " ") // get rid of punctuation...
.toLowerCase() // turn every letter to lowercase...
.trim() // trim the spaces...
.replaceAll("\\s+", " ")
.split(" ");
Set<String> word_set = new HashSet<String>(); // set to hold all of the unique words (WITHOUT DUPLICATES)
// add words to word set
for(String word : tokens)
word_set.add(word);
// write each unique word to have one occurrence in this particular line
for(String word : word_set)
{
wordToken.set(word);
context.write(wordToken, one);
}
}
}
public static class ReduceWordCount extends Reducer <Text, IntWritable, Text, IntWritable>
{
private IntWritable count = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int valueSum = 0;
for (IntWritable val : values)
valueSum += val.get();
count.set(valueSum);
context.write(key, count);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] pathArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (pathArgs.length < 2)
{
System.err.println("MR Project Usage: wordcount <input-path> [...] <output-path>");
System.exit(2);
}
Job wcJob = Job.getInstance(conf, "MapReduce WordCount");
wcJob.setJarByClass(LineFreq.class);
wcJob.setMapperClass(MapWordCount.class);
wcJob.setCombinerClass(ReduceWordCount.class);
wcJob.setReducerClass(ReduceWordCount.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(IntWritable.class);
for (int i = 0; i < pathArgs.length - 1; ++i)
{
FileInputFormat.addInputPath(wcJob, new Path(pathArgs[i]));
}
FileOutputFormat.setOutputPath(wcJob, new Path(pathArgs[pathArgs.length - 1]));
System.exit(wcJob.waitForCompletion(true) ? 0 : 1);
}
}
因此,我们可以使用以下文档作为输入对其进行测试:
hello world! hello! how are you, world?
i am fine! world world world! hello to you too!
what a wonderful world!
amazing world i must say, indeed
并通过以下输出确认单词频率确实是逐行计算的:
推荐阅读
- google-sheets - “服务器无法更新。请检查您的 API 凭据和互联网连接,然后重试。” - Google Sheets API,服务帐户错误
- tensorflow - 如何通过特定键对张量流中的数据集进行分组,同时避免内存不足?
- c++ - 我遇到了一个程序,但我不知道它是如何工作的!输出会是什么,为什么?
- sql - MariaDB:无法将数据插入表中
- mongodb - MongoDB 存储的 Loki 配置
- shell - 如何编写 shell 脚本来归档 GitLab 中特定组的存储库
- android - 如何使用 kotlin 协程进行 Firestore 查询
- c++ - 如何知道收到 gRPC 请求的端口
- android - 无法加载类“org.slf4j.LoggerFactory”
- go - 如何在符文中添加 ISO 9 拉丁字符