首页 > 解决方案 > 在 Hadoop 中使用由 id 列链接的两个 CSV 文件

问题描述

我是 Hadoop 新手,我不知道如何将两个 csv 文件链接在一起。这是我的两个 CSV 文件:

order_dataset.csv

 order_id   order_approved_at    order_delivered_customer_date
---------- -------------------  -------------------------------
     1     2017-10-02 19:55:00       2017-10-04 04:39:00
     2     2017-01-26 14:16:31       2017-02-02 14:08:10
     3     2018-06-09 03:13:12       2018-06-19 12:05:52

order_review_dataset.csv

 order_id   customer_id     review_score
---------- -------------  --------------
     1          12              3
     2          23              4
     3          93              5

我想要一个这样的结果文件:

 delivery_time in day   avg_review_score 
---------------------- ------------------ 
          1                    3.03
          2                    4.5
          3                    3.76

现在,我已经计算了交货时间。我不知道如何使用第二个 CSV 文件添加 review_score。这是我的代码:

public class Question {

public static void main(String[] args) throws Exception
{
    if (args.length != 1) {
        System.err.println("Usage: Question <input path>");
        System.exit(-1);
    }

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Job Title");
    job.setJarByClass(Question.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outputPath = new Path("./output/question3");
    FileOutputFormat.setOutputPath(job, outputPath);
    outputPath.getFileSystem(conf).delete(outputPath,true);

    job.setMapperClass(QuestionMapper.class);
    job.setReducerClass(QuestionReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

public class QuestionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text jourDelaiLivraison = new Text();

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException
{
    if (key.get() == 0)
        return;

    String ligne = value.toString();
    String[] tokens = ligne.split(",");

    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    try {
        Date order_approved_at = formatter.parse(tokens[4]);
        Date order_delivered_customer_date = formatter.parse(tokens[6]);

        //Il faut que la commande soit delivered
        //Et que les Dates soient cohérentes
        if (tokens[2].equals("delivered") && order_approved_at.compareTo(order_delivered_customer_date) < 0) {
            long diff = order_delivered_customer_date.getTime() - order_approved_at.getTime();
            String delai = String.valueOf(TimeUnit.DAYS.convert(diff, TimeUnit.MILLISECONDS));
            jourDelaiLivraison.set(delai);
        }

        context.write(jourDelaiLivraison, new IntWritable(1));
    } catch (ParseException e) {
        e.printStackTrace();
    }
}
}

public class QuestionReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private Map<Text, Integer> delaiNoteSatisfaction;

@Override
public void setup(Context context) {
    this.delaiNoteSatisfaction = new HashMap<>();
}

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
{
    int count = StreamSupport.stream(values.spliterator(), false)
            .mapToInt(IntWritable::get)
            .sum();

    delaiNoteSatisfaction.put(new Text(key), count);
}

@Override
public void cleanup(Context context){
    List<Text> keyList = new ArrayList(delaiNoteSatisfaction.keySet());
    keyList.sort(Comparator.comparingInt((Text t) -> Integer.valueOf(t.toString())));
    keyList.forEach(key -> {
        try {
            context.write(key, new IntWritable(delaiNoteSatisfaction.get(key)));
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    });
}
}

标签: javacsvhadoopmerge

解决方案


推荐阅读