java - 如何在 HDFS 中找到文件之间的对称差异?
问题描述
我在 hdfs 中有 2 个文件: /my/path/in/hdfs/part-r-* (大约 1000 个部分,每个 ~10000 行)和 /my/another/path/in/hdfs/part-r-* (相同尺寸)。第一个文件包含以下形式的数据:
id1 111
id6 212
id3 984
等等。第二个是这样的:
999 id8
15 id4
93 id1
我想找到第一个文件中没有出现在第二个文件中的所有 id,反之亦然。有什么简单的方法吗?
解决方案
我必须承认我怀疑这种计算是否适合 MapReduce 的严格范式,纯粹基于该过程的复杂性和计算量(即使您说在您的案例中输入的两个文件大小相同) ,所以我认为这是一个很好的案例,可以在保持简单的同时找到偷工减料的方法。
首先,为了摆脱额外的 IO 脂肪,您可能希望将这两个文件放在一个目录中(\input
为了简单起见,我们在这里说)只是为了绕过多输入混乱。之后,只需一项 MapReduce 作业就可以更轻松地完成工作。
在 Map 阶段,您需要做的就是将两个文件中的 ID 设置为键,并将它们出现的“文件名”设置为它们的值(这是一种找到对称差异的安全方法,同时继续概括可能是一个 ID可以在一个文件中多次看到)。那些“文件名”实际上并不需要是实际的文件名,您可以只放置字符串A
并B
表示该特定行中的该特定 ID 分别是在第一个或第二个文件中找到的。
在 Reduce 阶段,您可以将引用单个键/ID 的所有值放入一个集合中,该集合包含您放入HashSet
其中的所有唯一值。这意味着对于每个 reducer(也就是每个 ID),HashSet
创建 a 来放置A
和B
Strings 的几个实例,只存储这些实例中的一个。所以:
- 仅在第一个文件中看到的 ID 将包含一个
HashSet
集合A
, - 仅在第二个文件中看到的 ID 将包含一个
HashSet
集合,其中仅包含一个集合B
, - 在两个文件中都可以看到的 ID 将包含一个
HashSet
集合,其中包含A
和B
(也就是文件的交集,您不需要)。
有了它,您可以简单地检查每个 ID HashSet
,并只写上面描述的前两个选项中的那些。
这种类型的工作可能看起来像这样(这里的 Reduce 函数实际上并不需要在键值对中有值,所以我只是放了一个空String
以使事情更简单):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class SymDiff
{
/* input: <byte_offset, line_of_dataset>
* output: <ID, file>
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] line = value.toString().split(" "); // split each line to two columns
// if the first column consists of integers, put the ID from the 2nd column as the key
// and set "B" as the value to imply that the particular ID was found on the second file
// else, put the ID from the first column as the key
// and set "A" as the value to imply that the particular ID was found on the first file
if(line[0].matches("\\d+")) // (lazy way to see if the first string is an int without throwing an exception)
context.write(new Text(line[1]), new Text("B"));
else
context.write(new Text(line[0]), new Text("A"));
}
}
/* input: <ID, file>
* output: <ID, "">
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
HashSet<String> list_of_files = new HashSet<String>();
// store all the instances of "A" and "B" for each ID in a HashSet with unique values
for(Text value : values)
list_of_files.add(value.toString());
// only write the IDs which they values only contain "A" or "B" (and not both) on their set
if(list_of_files.contains("A") && !list_of_files.contains("B") || (!list_of_files.contains("A") && list_of_files.contains("B")))
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("output");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job job = Job.getInstance(conf, "Symmetric Difference");
job.setJarByClass(SymDiff.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input_dir);
FileOutputFormat.setOutputPath(job, output_dir);
job.waitForCompletion(true);
}
}
您可以在下面的屏幕截图中看到所需的输出:
推荐阅读
- json - jq - 如何遍历具有不同名称的嵌套键?
- python - yfinance (ticker).info 没有返回任何东西
- python - 我的代码在我的计算机上完美运行,但在服务器上执行时却不行?PYTHON
- django - CIEmail 或 CIText 的 Django postgresql ArrayField 默认为 callable 在运行测试时引发错误
- c - gcc 结构赋值警告缺少初始化器周围的大括号
- spring-cloud-dataflow - 查找 Spring Cloud Dataflow 服务器运行时的日志文件
- java - 使用 Curl 制作 android 登录表单
- c# - C# - 为高度并发的方法设置 CancellationTokens 的正确方法
- python-3.x - 如何将用户的消息分配到 discord.py 中的变量中?
- javascript - 如果键值对于该节点是唯一的并且已提供,如何将节点值更新为用户在嵌套 JSON 结构中提供的值?