首页 > 技术文章 > 测试试卷—数据清洗

gothic-death 2019-11-13 21:55 原文

Result文件数据说明:

Ip106.39.41.166,(城市)

Date10/Nov/2016:00:01:02 +0800,(日期)

Day10,(天数)

Traffic: 54 ,(流量)

Type: video,(类型:视频video或文章article

Id: 8701(视频或者文章的id

测试要求:

1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中

两阶段数据清洗:

1)第一阶段:把需要的信息从原始日志中提取出来

ip:    199.30.25.88

time:  10/Nov/2016:00:01:03 +0800

traffic:  62

文章: article/11325

视频: video/3235

2)第二阶段:根据提取出来的信息做精细化操作

ip--->城市 cityIP

date--> time:2016-11-10 00:01:03

day: 10

traffic:62

type:article/video

id:11325

3hive数据库表结构:

create table data(  ip string,  time string , day string, traffic bigint,

type string, id   string )

 

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class QX {
    public static class LogParser{
        public String[] parse(String line) {
            String ip = parseIP(line);
            String time = parseTime(line);
            String traffic = parseTraffic(line);
            String con = parseCon(line);
            return new String[] {ip, time, traffic, con};
        }
        private String parseIP(String line) {
            String ip = line.split("- -")[0].trim();
            return ip;
        }
        private String parseTime(String line) {
            final int first = line.indexOf("[");
            final int last = line.indexOf("+0800]");
            String time = line.substring(first+1, last).trim();
            return time;
        }
        private String parseTraffic(String line) {
            String s[] = line.split(" ");
            return s[9];
        }
        private String parseCon(String line) {
            String s[] = line.split(" ");
            return s[11];
        }
    }
    public static class Map extends Mapper<LongWritable, Text, LongWritable, Text> {
        LogParser logParser = new LogParser();
        Text outputValue = new Text();
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            if(value.toString().split(" ").length >= 25) {
                final String[] parsed = logParser.parse(value.toString());
                String type = "";
                if(value.toString().split(" ")[11].contains("video")) {
                    type = "video";
                }else if(value.toString().split(" ")[11].contains("article")) {
                    type = "article";
                }else {
                    return;
                }
                int a = value.toString().split(" ")[11].lastIndexOf("/");
                int b = 0;
                if(value.toString().split(" ")[11].contains("?")) {
                    b = value.toString().split(" ")[11].lastIndexOf("?");
                }else if(value.toString().split(" ")[11].contains(".")) {
                    b = value.toString().split(" ")[11].lastIndexOf(".");
                }
                String id = "";
                if(b > a) {
                    id = value.toString().split(" ")[11].substring(a+1, b);
                }else {
                    id = value.toString().split(" ")[11].substring(a+1, value.toString().split(" ")[11].length()-1);
                }
                outputValue.set(parsed[0]+","+parsed[1]+","+parsed[1].substring(0,2)+","+parsed[2]+","+type+","+id);
                context.write(key, outputValue);
            }
        }
    }
    public static class Reduce extends Reducer<LongWritable, Text, Text, NullWritable> {
        protected void reduce(
            Text k2,
            java.lang.Iterable<Text> v2s,
            org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, NullWritable>.Context context)
            throws java.io.IOException, InterruptedException {
            for (Text v2 : v2s) {
                context.write(v2, NullWritable.get());
            }
        };
    }
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://192.168.100.10:9000/log.log";
        otherArgs[1] = "hdfs://192.168.100.10:9000/out";
        Job job = new Job(conf, "SHQX");
        job.setJarByClass(QX.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
QX

 

 

2、数据处理:

·统计最受欢迎的视频/文章的Top10访问次数 (video/article

·按照地市统计最受欢迎的Top10课程 (ip

·按照流量统计最受欢迎的Top10课程 (traffic

 

package com.test.two.dao;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Paixu {

    public static List<String> Names=new ArrayList<String>();
    public static  List<String> Values=new ArrayList<String>();
    public static  List<String> Texts=new ArrayList<String>();
    public static class Sort extends WritableComparator {
        public Sort(){
        
        super(IntWritable.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
        return -a.compareTo(b);
        }
        }
    public static class Map extends Mapper<Object , Text , IntWritable,Text >{  
    private static Text Name=new Text();
    private static IntWritable num=new IntWritable();
    public void map(Object key,Text value,Context context)throws IOException, InterruptedException
    {
         String line=value.toString();  
         String mid=new String();
            String arr[]=line.split("\t");  
            if(!arr[0].startsWith(" "))
            {
                  num.set(Integer.parseInt(arr[2]));  
                  mid=arr[0]+"\t"+arr[1];
                  Name.set(mid);
                  context.write(num, Name);
            }
          
    }
    }
    public static class Reduce extends Reducer< IntWritable, Text, Text, IntWritable>{  
        private static IntWritable result= new IntWritable();  
        int i=0;
        
         public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
                for(Text val:values){  
                    
                    if(i<10)
                    {i=i+1;
                    String mid=new String();
                    mid=val.toString();
                    String arr[]=mid.split("\t");
                    Texts.add(arr[1]);
                        Names.add(arr[0]);
                        Values.add(key.toString());
                    }
                context.write(val,key);  
                }  
    }
    }

  
    
 
    
    public static int run()throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf=new Configuration();  
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        FileSystem fs =FileSystem.get(conf);
        Job job =new Job(conf,"OneSort");  
        job.setJarByClass(test2_pai.class);  
        job.setMapperClass(Map.class);  
        job.setReducerClass(Reduce.class);  
        job.setSortComparatorClass(Sort.class);
        job.setOutputKeyClass(IntWritable.class);  
        job.setOutputValueClass(Text.class);  
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
        Path in=new Path("hdfs://localhost:9000/test2/out/traffic/2/part-r-00000");  
        Path out=new Path("hdfs://localhost:9000/test2/out/traffic/3");  
        FileInputFormat.addInputPath(job,in);  
        fs.delete(out,true);
        FileOutputFormat.setOutputPath(job,out);  
       return(job.waitForCompletion(true) ? 0 : 1);  
        
       
        }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
          run();
           for(String n:Names)
            {
                System.out.println(n);
               }
          } 
}
Paixu

 

 

 

 

3、数据可视化:将统计结果倒入MySql数据库中,通过图形化展示的方式展现出来。

推荐阅读