首页 > 解决方案 > 如何在 HDFS 中找到文件之间的对称差异?

问题描述

我在 hdfs 中有 2 个文件: /my/path/in/hdfs/part-r-* (大约 1000 个部分,每个 ~10000 行)和 /my/another/path/in/hdfs/part-r-* (相同尺寸)。第一个文件包含以下形式的数据:

id1 111
id6 212
id3 984

等等。第二个是这样的:

999 id8
15 id4
93 id1

我想找到第一个文件中没有出现在第二个文件中的所有 id,反之亦然。有什么简单的方法吗?

标签: javahadoophdfs

解决方案


我必须承认我怀疑这种计算是否适合 MapReduce 的严格范式,纯粹基于该过程的复杂性和计算量(即使您说在您的案例中输入的两个文件大小相同) ,所以我认为这是一个很好的案例,可以在保持简单的同时找到偷工减料的方法。

首先,为了摆脱额外的 IO 脂肪,您可能希望将这两个文件放在一个目录中(\input为了简单起见,我们在这里说)只是为了绕过多输入混乱。之后,只需一项 MapReduce 作业就可以更轻松地完成工作。

在 Map 阶段,您需要做的就是将两个文件中的 ID 设置为键,并将它们出现的“文件名”设置为它们的值(这是一种找到对称差异的安全方法,同时继续概括可能是一个 ID可以在一个文件中多次看到)。那些“文件名”实际上并不需要是实际的文件名,您可以只放置字符串AB表示该特定行中的该特定 ID 分别是在第一个或第二个文件中找到的。

在 Reduce 阶段,您可以将引用单个键/ID 的所有值放入一个集合中,该集合包含您放入HashSet其中的所有唯一值。这意味着对于每个 reducer(也就是每个 ID),HashSet创建 a 来放置ABStrings 的几个实例,只存储这些实例中的一个。所以:

  • 仅在第一个文件中看到的 ID 将包含一个HashSet集合A
  • 仅在第二个文件中看到的 ID 将包含一个HashSet集合,其中仅包含一个集合B
  • 在两个文件中都可以看到的 ID 将包含一个HashSet集合,其中包含AB(也就是文件的交集,您不需要)。

有了它,您可以简单地检查每个 ID HashSet,并只写上面描述的前两个选项中的那些。

这种类型的工作可能看起来像这样(这里的 Reduce 函数实际上并不需要在键值对中有值,所以我只是放了一个空String以使事情更简单):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class SymDiff 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <ID, file>
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text> 
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String[] line = value.toString().split(" ");    // split each line to two columns
            
            // if the first column consists of integers, put the ID from the 2nd column as the key
            // and set "B" as the value to imply that the particular ID was found on the second file
            // else, put the ID from the first column as the key
            // and set "A" as the value to imply that the particular ID was found on the first file
            if(line[0].matches("\\d+"))     // (lazy way to see if the first string is an int without throwing an exception)
                context.write(new Text(line[1]), new Text("B"));
            else
                context.write(new Text(line[0]), new Text("A"));
        }
    }

    /* input: <ID, file>
     * output: <ID, "">
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            HashSet<String> list_of_files = new HashSet<String>();

            // store all the instances of "A" and "B" for each ID in a HashSet with unique values
            for(Text value : values)
                list_of_files.add(value.toString());
            
            // only write the IDs which they values only contain "A" or "B" (and not both) on their set 
            if(list_of_files.contains("A") && !list_of_files.contains("B") || (!list_of_files.contains("A") && list_of_files.contains("B")))
                context.write(key, new Text(""));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("output");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Symmetric Difference");
        job.setJarByClass(SymDiff.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input_dir);
        FileOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

您可以在下面的屏幕截图中看到所需的输出:

在此处输入图像描述


推荐阅读