java - 在 Hadoop 中使用由 id 列链接的两个 CSV 文件
问题描述
我是 Hadoop 新手,我不知道如何将两个 csv 文件链接在一起。这是我的两个 CSV 文件:
order_dataset.csv
order_id order_approved_at order_delivered_customer_date
---------- ------------------- -------------------------------
1 2017-10-02 19:55:00 2017-10-04 04:39:00
2 2017-01-26 14:16:31 2017-02-02 14:08:10
3 2018-06-09 03:13:12 2018-06-19 12:05:52
order_review_dataset.csv
order_id customer_id review_score
---------- ------------- --------------
1 12 3
2 23 4
3 93 5
我想要一个这样的结果文件:
delivery_time in day avg_review_score
---------------------- ------------------
1 3.03
2 4.5
3 3.76
现在,我已经计算了交货时间。我不知道如何使用第二个 CSV 文件添加 review_score。这是我的代码:
public class Question {
public static void main(String[] args) throws Exception
{
if (args.length != 1) {
System.err.println("Usage: Question <input path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Job Title");
job.setJarByClass(Question.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path("./output/question3");
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath,true);
job.setMapperClass(QuestionMapper.class);
job.setReducerClass(QuestionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
public class QuestionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text jourDelaiLivraison = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
if (key.get() == 0)
return;
String ligne = value.toString();
String[] tokens = ligne.split(",");
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date order_approved_at = formatter.parse(tokens[4]);
Date order_delivered_customer_date = formatter.parse(tokens[6]);
//Il faut que la commande soit delivered
//Et que les Dates soient cohérentes
if (tokens[2].equals("delivered") && order_approved_at.compareTo(order_delivered_customer_date) < 0) {
long diff = order_delivered_customer_date.getTime() - order_approved_at.getTime();
String delai = String.valueOf(TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS));
jourDelaiLivraison.set(delai);
}
context.write(jourDelaiLivraison, new IntWritable(1));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public class QuestionReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private Map<Text, Integer> delaiNoteSatisfaction;
@Override
public void setup(Context context) {
this.delaiNoteSatisfaction = new HashMap<>();
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
int count = StreamSupport.stream(values.spliterator(), false)
.mapToInt(IntWritable::get)
.sum();
delaiNoteSatisfaction.put(new Text(key), count);
}
@Override
public void cleanup(Context context){
List<Text> keyList = new ArrayList(delaiNoteSatisfaction.keySet());
keyList.sort(Comparator.comparingInt((Text t) -> Integer.valueOf(t.toString())));
keyList.forEach(key -> {
try {
context.write(key, new IntWritable(delaiNoteSatisfaction.get(key)));
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
});
}
}
解决方案
推荐阅读
- html - 修改 CSS 文件
- angular - Problems to upload a file from Angular 7 to Laravel
- javascript - CSS3。VueJS。动态过渡背景渐变
- python - Django 应用程序在推送到 Pivotal Cloud Foundry 时失败
- macos - 无法在 Mac Catalina 上的 VSCode 中创建/保存任何新文件并不断收到只读文件系统错误
- python - 高效的二维阵列处理
- python-3.x - 为什么 Python 3 itertools.chain 不让我实现多范围
- xamarin - Xamarin Android 在运行时将 Manifest 更改为 allowBackup=true
- wordpress - Navigation item getting localhost url - Wordpress
- android - nativescript console.log() 不工作