hadoop - Hadoop:Redcuer 没有发出正确的计算
问题描述
我有以下 Reducer 类(MapReduce 作业的一部分),它应该计算score = POS /(-1*sum(NEGs))
.
其中POS
是 1 个正数,NEGs
是 2 个负数。总是这样。
例如,如果映射器的输入是:
<A, A> -15.0
<A, A> 2.0
<A, A> -15.0
预期的输出将是:
<A, A> 0.06666666666666667
但是,它infinity
为每个输出记录输出!
<A, A> Infinity
在调试时,如果我添加语句以在 while 循环内发出值:
score.set(val);
context.write(key, score);
,它可以很好地打印结果,但会重复除法。所以我得到以下信息:
<A, A> -15.0
<A, A> 2.0
<A, A> -15.0
<A, A> 0.06666666666666667 # correct calculation (2/30)
<A, A> 0.0022222222222222222 # Not sure why it divids twice by 30 (2/30/30)!!
这是MyReducer
课
private static class MyReducer extends
Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
private DoubleWritable score = new DoubleWritable();
int counter = 0;
@Override
public void reduce(Pair key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
Iterator<DoubleWritable> iter = values.iterator();
double nor = 0.0;
double don = 0.0;
double val;
while (iter.hasNext()) {
val = iter.next().get();
if (val < 0)
don += val*-1;
else
nor = val;
//uncomment for debugging!
//score.set(val);
//context.write(key, score);
}
score.set(nor / don);
context.write(key, score);
}
谁能解释为什么
- 如果我在 while 循环中没有发出任何东西,则发出无穷大
- 除以分母两次?
谢谢!
解决方案
当然,在 Java 中表现滑稽的双精度数绝非罕见,但在这种特殊情况下,并不是双精度数的怪异方式,至于它们在 Hadoop 术语中的兼容性。
首先也是最重要的,这种类型的归约计算对于仅在作业的归约阶段而不是在组合阶段(如果有的话)中使用至关重要。如果您已将此 reduce 计算设置为也作为组合器实现,您可以考虑取消设置此设置。这不是一个经验法则,但是 MapReduce 作业中存在很多错误,人们无法完全弄清楚为什么 reducer 会得到奇怪的数据或连续执行两次计算(就像你指出的那样出去)。
但是,问题的可能罪魁祸首是,为了获得安全的双类型除法,您确实需要使用类型转换来获得正确的双类型结果。
为了展示这一点,我使用了一个基于您的输入数据并存储在\input
目录中的输入示例。每个唯一键都有一个正数和两个负数作为值(String
为了简单起见,这里将键设置为),如下所示:
Α -15.0
Α 2.0
Α -15.0
Β -10.0
Β 9.0
Β -12.0
C -7.0
C 1.0
C -19.0
D -5.0
D 18.0
D -5.0
E -6.0
E 6.0
E -6.0
然后使用显式类型转换来计算每个分数,如下面的代码所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class ScoreComp
{
/* input: <Character, Number>
* output: <Character, Number>
*/
public static class Map extends Mapper<Object, Text, Text, DoubleWritable>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] parts = record.split(" "); // just split the lines into key and value
// create key-value pairs from each line
context.write(new Text(parts[0]), new DoubleWritable(Double.parseDouble(parts[1])));
}
}
/* input: <Character, Number>
* output: <Character, Score>
*/
public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
{
double pos = 0.0;
double neg = 0.0;
// for every value of a unique key...
for(DoubleWritable value : values)
{
// retrieve the positive number and calculate the sum of the two negative numbers
if(value.get() < 0)
neg += value.get();
else
pos = value.get();
}
// calculate the score based on the values of each key (using explicit type casting)
double result = (double) pos / (-1 * neg);
// create key-value pairs for each key with its score
context.write(key, new DoubleWritable(result));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("scores");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job scorecomp_job = Job.getInstance(conf, "Score Computation");
scorecomp_job.setJarByClass(ScoreComp.class);
scorecomp_job.setMapperClass(Map.class);
scorecomp_job.setReducerClass(Reduce.class);
scorecomp_job.setMapOutputKeyClass(Text.class);
scorecomp_job.setMapOutputValueClass(DoubleWritable.class);
scorecomp_job.setOutputKeyClass(Text.class);
scorecomp_job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(scorecomp_job, input_dir);
FileOutputFormat.setOutputPath(scorecomp_job, output_dir);
scorecomp_job.waitForCompletion(true);
}
}
您可以看到/scores
目录中 MapReduce 作业的结果在数学上是有意义的(通过 HDFS 浏览器截取的屏幕截图):
推荐阅读
- flutter - 连接服务协议出错:无法连接到http://127.0.0.1:58463/vWu5Xiui7n4=/
- java - Java代码显示到下一个闰年的年数
- python - 用新的数据框更新数据框,覆盖
- button - 如何根据页面语言更改按钮位置?
- r - 使用 ggplot 按百分比显示直方图
- python - 如何让使用正则表达式匹配年份和月份?
- angular - 如何销毁以编程方式注入动态创建的角度组件的服务?
- css - Tailwind CSS:溢出隐藏不隐藏完整翻译
- openshift - AWS 上的 Openshift 自动允许以 root 用户身份部署容器
- python - sklearn 中的 ccp_alpha 查找 optmila 值