java - 如何将 JSON 文件读入 avro mapreduce?
问题描述
我正在尝试使用 avro 为 hadoop 编写 mapreduce 作业。目标是使用 avro 加载 json 文件,然后在其上运行一些 reducer。
因为我想在 mapreduce 中减少它并且它应该高效运行,所以我不想在运行作业之前将磁盘上的 json 文件转换为 avro 文件。
不幸的是,我收到以下错误:
Error: java.lang.ClassCastException: class org.apache.hadoop.io.Text cannot be cast to class org.apache.avro.mapred.AvroKey (org.apache.hadoop.io.Text and org.apache.avro.mapred.AvroKey are in unnamed module of loader 'app'
WordCount.java
package mz;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount extends Configured implements Tool {
public static class TokenizerMapper extends
Mapper<AvroKey<Review>, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
@Override
public void map(AvroKey<Review> key, Text value, Context context)
throws IOException, InterruptedException {
String category = key.datum().getCategory().toString();
String reviewText = key.datum().getReviewText().toString();
StringTokenizer itr = new StringTokenizer(reviewText);
while (itr.hasMoreTokens()) {
context.write(new Text(category + ":" + itr.nextToken()), ONE);
}
}
}
public static class CountReducer extends
Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new AvroKey<>(key.toString()), new AvroValue<>(sum));
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: mz.MapReduceColorCount <input path> <output path>");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName("Color Count");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass(TokenizerMapper.class);
AvroJob.setInputKeySchema(job, Review.getClassSchema());
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
job.setReducerClass(CountReducer.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new WordCount(), args);
System.exit(res);
}
}
评论.avsc
{
"type" : "record",
"name" : "Review",
"namespace" : "mz",
"fields" : [ {
"name" : "reviewerID",
"type" : "string"
}, {
"name" : "asin",
"type" : "string"
}, {
"name" : "reviewerName",
"type" : "string"
}, {
"name" : "helpful",
"type" : {
"type" : "array",
"items" : "long"
}
}, {
"name" : "reviewText",
"type" : "string"
}, {
"name" : "overall",
"type" : "double"
}, {
"name" : "summary",
"type" : "string"
}, {
"name" : "unixReviewTime",
"type" : "long"
}, {
"name" : "reviewTime",
"type" : "string"
}, {
"name" : "category",
"type" : "string"
} ]
}
解决方案
KeyValueTextInputFormat 要求 Map 输入为 (Text, Text) 元组,默认情况下,这些元组在文件中按制表符分隔。
因此,尝试使用 AvroKey 类读取输入是不正确的,如果需要,您将需要使用不同的 InputFormat。否则,你说你正在阅读 JSON 文件,所以你的映射器输入不会是 Avro,它实际上是 Text
此外,我强烈建议使用 Spark 来读取/写入 JSON 和 Avro,因为 MapReduce 没有强大的 JSON 支持
推荐阅读
- javascript - THREE.Matrix4: .makePerspective() has been redefined and has a new signature. Please check the docs
- python - 如何在 Tensorflow 中计算整个数据集的统计数据(总和、均值、方差等)
- openscenegraph - 无法在 FBX 文件中嵌入纹理
- sharepoint - SharePoint WSP 部署未完成,因为 SharePoint 更新
- fortran - 带有格式化换行符的单个 Fortran WRITE 语句?
- html - 应用 xsl 时遇到问题:选择
- hdf5 - 如何在 HDF5 中存储标记的联合
- c# - 为什么 GridFS 查询在 C# 中返回空值/默认值
- javascript - 如何避免在更改和单击之间触发错误功能
- python - 如何使用 Oauth2.0 解码不透明的访问令牌