第10章 Hive实战之谷粒影音
10.1 需求描述
统计硅谷影音视频网站的常规指标,各种TopN指标:
--统计视频观看数Top10
--统计视频类别热度Top10
--统计视频观看数Top20所属类别
--统计视频观看数Top50所关联视频的所属类别Rank
--统计每个类别中的视频热度Top10
--统计每个类别中视频流量Top10
--统计上传视频最多的用户Top10以及他们上传的视频
--统计每个类别视频观看数Top10
10.2 项目
10.2.1 数据结构
1.视频表
表6-13 视频表
字段 |
备注 |
详细描述 |
video id |
视频唯一id |
11位字符串 |
uploader |
视频上传者 |
上传视频的用户名String |
age |
视频年龄 |
视频在平台上的整数天 |
category |
视频类别 |
上传视频指定的视频分类 |
length |
视频长度 |
整形数字标识的视频长度 |
views |
观看次数 |
视频被浏览的次数 |
rate |
视频评分 |
满分5分 |
ratings |
流量 |
视频的流量,整型数字 |
conments |
评论数 |
一个视频的整数评论数 |
related ids |
相关视频id |
相关视频的id,最多20个 |
2.用户表
表6-14 用户表
字段 |
备注 |
字段类型 |
uploader |
上传者用户名 |
string |
videos |
上传视频数 |
int |
friends |
朋友数量 |
int |
10.2.2 ETL原始数据
通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用“\t”进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用“&”分割,同时去掉两边空格,多个相关视频id也使用“&”进行分割。
import com.atlxl.ETLUtil; /** * @author LXL * @create 2019-06-05 23:07 */ public class Test { public static void main(String[] args) { System.out.println(ETLUtil.etlStr("SDNkMu8ZT68\tw00dy911\t630\tPeople & Blogs\t186\t10181\t3.49\t494\t257\trjnbgpPJUks\tkdjhkdfhsgksd")); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.atlxl</groupId> <artifactId>guliETL</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies> </project>
1.ETL之ETLUtil
package com.atlxl; /** * @author LXL * @create 2019-06-05 22:27 * * 1.过滤脏数据 * 2.将类别字段中的空格 * 3.替换关联视频中的分隔符 */ public class ETLUtil { public static String etlStr(String line){ //切割数据 String[] split = line.split("\t"); //1. 过滤脏数据 if (split.length<9) return null; //2. 去掉类别字段中的空格 split[3] = split[3].replaceAll(" ", ""); //3.替换关联视频的分隔符 StringBuffer sb = new StringBuffer(); for (int i=0; i < split.length; i++) { if (i < 9) { if (i==split.length - 1){ sb.append(split[i]); }else { sb.append(split[i]).append("\t"); } }else { if (i==split.length - 1){ sb.append(split[i]); }else { sb.append(split[i]).append("&"); } } } return sb.toString(); } }
原文:
public class ETLUtil { public static String oriString2ETLString(String ori){ StringBuilder etlString = new StringBuilder(); String[] splits = ori.split("\t"); if(splits.length < 9) return null; splits[3] = splits[3].replace(" ", ""); for(int i = 0; i < splits.length; i++){ if(i < 9){ if(i == splits.length - 1){ etlString.append(splits[i]); }else{ etlString.append(splits[i] + "\t"); } }else{ if(i == splits.length - 1){ etlString.append(splits[i]); }else{ etlString.append(splits[i] + "&"); } } }
return etlString.toString(); } } |
2.ETL之Mapper
package com.atlxl; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author LXL * @create 2019-06-05 21:26 */ public class ETlMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.获取一行数据 String line = value.toString(); //2.清洗数据 String etlStr = ETLUtil.etlStr(line); //3.写出数据 if (StringUtils.isBlank(etlStr)) { return; } k.set(etlStr); context.write(k, NullWritable.get()); } }
原文:
import java.io.IOException;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import com.atguigu.util.ETLUtil;
public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{ Text text = new Text();
@Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String etlString = ETLUtil.oriString2ETLString(value.toString());
if(StringUtils.isBlank(etlString)) return;
text.set(etlString); context.write(NullWritable.get(), text); } } |
3.ETL之Runner
package com.atlxl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author LXL * @create 2019-06-05 23:22 */ public class ETLDriver implements Tool{ private Configuration configuration; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(getConf()); //2.封装driver类 job.setJarByClass(ETLDriver.class); //3.关联Mapper类 job.setMapperClass(ETlMapper.class); //4.Mapper输出的KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //5.最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //6.输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setNumReduceTasks(0); //7.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 : 1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new ETLDriver(), args); System.out.println(run); } }
原文:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class VideoETLRunner implements Tool { private Configuration conf = null;
@Override public void setConf(Configuration conf) { this.conf = conf; }
@Override public Configuration getConf() { return this.conf; }
@Override public int run(String[] args) throws Exception { conf = this.getConf(); conf.set("inpath", args[0]); conf.set("outpath", args[1]);
Job job = Job.getInstance(conf);
job.setJarByClass(VideoETLRunner.class);
job.setMapperClass(VideoETLMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0);
this.initJobInputPath(job); this.initJobOutputPath(job);
return job.waitForCompletion(true) ? 0 : 1; }
private void initJobOutputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String outPathString = conf.get("outpath");
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(outPathString); if(fs.exists(outPath)){ fs.delete(outPath, true); }
FileOutputFormat.setOutputPath(job, outPath);
}
private void initJobInputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String inPathString = conf.get("inpath");
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(inPathString); if(fs.exists(inPath)){ FileInputFormat.addInputPath(job, inPath); }else{ throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString); } }
public static void main(String[] args) { try { int resultCode = ToolRunner.run(new VideoETLRunner(), args); if(resultCode == 0){ System.out.println("Success!"); }else{ System.out.println("Fail!"); } System.exit(resultCode); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } } |
4.执行ETL #这里忽视
$ bin/yarn jar ~/softwares/jars/gulivideo-0.0.1-SNAPSHOT.jar \ com.atguigu.etl.ETLVideosRunner \ /gulivideo/video/2008/0222 \ /gulivideo/output/video/2008/0222 |
10.3 准备工作
上传数据到集群:
[root@node01 datas]# hadoop fs -put user /
[root@node01 datas]# hadoop fs -put video /
执行jar包:
[root@node01 datas]# yarn jar etl.jar com.atlxl.ETLDriver /video/2008/0222 /output
10.3.1 创建表
创建表:gulivideo_ori,gulivideo_user_ori,
创建表:gulivideo_orc,gulivideo_user_orc
gulivideo_ori:
create table gulivideo_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile;
gulivideo_user_ori:
create table gulivideo_user_ori( uploader string, videos int, friends int) row format delimited fields terminated by "\t" stored as textfile;
然后把原始数据插入到orc表中
gulivideo_orc:
create table gulivideo_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) clustered by (uploader) into 8 buckets row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
gulivideo_user_orc:
create table gulivideo_user_orc( uploader string, videos int, friends int) row format delimited fields terminated by "\t" stored as orc;
10.3.2 导入ETL后的数据
gulivideo_ori:
load data inpath "/gulivideo/output/video/2008/0222" into table gulivideo_ori; |
gulivideo_user_ori:
load data inpath "/gulivideo/user/2008/0903" into table gulivideo_user_ori; |
10.3.3 向ORC表插入数据
gulivideo_orc:
insert into table gulivideo_orc select * from gulivideo_ori; 或: insert overwrite table gulivideo_orc select * from gulivideo_ori; |
gulivideo_user_orc:
insert into table gulivideo_user_orc select * from gulivideo_user_ori; 或: insert overwrite table gulivideo_user_orc select * from gulivideo_user_ori; |
10.4 业务分析
10.4.1 统计视频观看数Top10
思路:使用order by按照views字段做一个全局排序即可,同时我们设置只显示前10条。
最终代码:
select * from gulivideo_orc order by views desc limit 10;
这里可能会出现内存溢出,报错可以查看一下日志。如果是内存溢出看下一章常见问题解决
原文:
select videoId, uploader, age, category, length, views, rate, ratings, comments from gulivideo_orc order by views desc limit 10; |
10.4.2 统计视频类别热度Top10
思路:
1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。
2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。
3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group by类别,需要先将类别进行列转行(展开),然后再进行count即可。
4) 最后按照热度排序,显示前10条。
a.炸开类别 select videoId, category_name from gulivideo_orc lateral view explode(category) category_t as category_name; t1 b.统计每种类别下的视频数 select category_name, count(*) hot from t1 group by category_name; t2 c.类别热度Top10 select category_name, hot from t2 order by hot desc limit 10;
最终代码:
SELECT category_name, hot FROM ( SELECT category_name, COUNT(*) AS hot FROM ( SELECT videoId, category_name FROM gulivideo_orc LATERAL VIEW explode(category) category_t AS category_name ) t1 GROUP BY category_name ) t2 ORDER BY hot DESC LIMIT 10;
或:
select category_name as category, count(t1.videoId) as hot from ( select videoId, category_name from gulivideo_orc lateral view explode(category) t_catetory as category_name) t1 group by t1.category_name order by hot desc limit 10;
或:
SELECT category_name, COUNT(*) AS hot FROM ( SELECT videoId, category_name FROM gulivideo_orc LATERAL VIEW explode(category) category_t AS category_name ) t1 GROUP BY category_name ORDER BY hot DESC LIMIT 10;
思路:10.4.3 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列
2) 把这20条信息中的category分裂出来(列转行)
3) 最后查询视频分类名称和该分类下有多少个Top20的视频
a.统计观看数Top20所属类别 select videoId,views,category from gulivideo_orc order by views desc limit 20; t1 b.前20视频的类别 select videoId, category_name from t1 lateral view explode(category) category_t as category_name; t2 c.去重 select videoId, distinct(t2.category_name) from t2
最终代码:
select category_name as category, count(t2.videoId) as hot_with_views from ( select videoId, category_name from ( select * from gulivideo_orc order by views desc limit 20) t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc;
思路:10.4.4 统计视频观看数Top50所关联视频的所属类别Rank
1) 查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1
t1:观看数前50的视频
select * from gulivideo_orc order by views desc limit 50; |
2) 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2
t2:将相关视频的id进行列转行操作
select explode(relatedId) as videoId from t1; |
3) 将相关视频的id和gulivideo_orc表进行inner join操作
t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id
(select distinct(t2.videoId), t3.category from t2 inner join gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name; |
4) 按照视频类别进行分组,统计每组视频个数,然后排行
a.统计观看数前50的视频&关联视频 select videoId, views, category, relatedId from gulivideo_orc order by views DESC limit 50; t1 b.炸开关联视频id 去重 select DISTINCT(relatedId_name) from t1 lateral view explode(relatedId) relatedId_t as relatedId_name; t2 c.关联视频所属类别 select * from t2 join gulivideo_orc t3 on t2.relatedId_name=t3.videoId; t4 d.炸开关联视频的类别 select * from t4 lateral view explode(category) category_t as category_name; t5 e.统计类别个数 SELECT category_name,count(*) hot from t5 GROUP BY category_name; t6 f.统计类别Rank SELECT * FROM t6 order by hot desc;
最终代码:
SELECT * FROM ( SELECT category_name, COUNT(*) AS hot FROM ( SELECT * FROM ( SELECT * FROM ( SELECT DISTINCT relatedId_name FROM ( SELECT videoId, views, category, relatedId FROM gulivideo_orc ORDER BY views DESC LIMIT 50 ) t1 LATERAL VIEW explode(relatedId) relatedId_t AS relatedId_name ) t2 JOIN gulivideo_orc t3 ON t2.relatedId_name = t3.videoId ) t4 LATERAL VIEW explode(category) category_t AS category_name ) t5 GROUP BY category_name ) t6 ORDER BY hot DESC;
源码:
select category_name as category, count(t5.videoId) as hot from ( select videoId, category_name from ( select distinct(t2.videoId), t3.category from ( select explode(relatedId) as videoId from ( select * from gulivideo_orc order by views desc limit 50) t1) t2 inner join gulivideo_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5 group by category_name order by hot desc; |
10.4.5 统计每个类别中的视频热度Top10,以Music为例
思路:
1) 要想统计Music类别中的视频热度Top10,需要先找到Music类别,那么就需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。
2) 向category展开的表中插入数据。
3) 统计对应类别(Music)中的视频热度。
最终代码:
创建表类别表:
create table gulivideo_category( videoId string, uploader string, age int, categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
向类别表中插入数据:
insert into table gulivideo_category select videoId, uploader, age, categoryId, length, views, rate, ratings, comments, relatedId from gulivideo_orc lateral view explode(category) catetory as categoryId;
统计Music类别的Top10(也可以统计其他)
select videoId, views from gulivideo_category where categoryId = "Music" order by views desc limit 10;
10.4.6 统计每个类别中视频流量Top10,以Music为例
思路:
1) 创建视频类别展开表(categoryId列转行后的表)
2) 按照ratings排序即可
最终代码:
select videoId, views, ratings from gulivideo_category where categoryId = "Music" order by ratings desc limit 10;
10.4.7 统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频
思路:
1) 先找到上传视频最多的10个用户的用户信息
select * from gulivideo_user_orc order by videos desc limit 10; |
2) 通过uploader字段与gulivideo_orc表进行join,得到的信息按照views观看次数进行排序即可。
最终代码:
select t2.videoId, t2.views, t2.ratings, t1.videos, t1.friends from ( select * from gulivideo_user_orc order by videos desc limit 10) t1 join gulivideo_orc t2 on t1.uploader = t2.uploader order by views desc limit 20; |
a.上传视频数钱20的人 SELECT uploader FROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10; t1 b.这10个人所上传的视频 SELECT * FROM t1 jion gulivideo_orc t2 on t1.uploader = t2.uploader; t3 c.观看次数在前20的视频 SELECT * FROM t3 ORDER BY views DESC LIMIT 20; SELECT * FROM (SELECT * FROM (SELECT uploader,videos FROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10) t1 join gulivideo_orc t2 on t1.uploader = t2.uploader) t3 ORDER BY views DESC LIMIT 20; SELECT * FROM ( SELECT views, videoId, t1.uploader FROM ( SELECT uploader, videos FROM gulivideo_user_orc ORDER BY videos DESC LIMIT 10 ) t1 JOIN gulivideo_orc t2 ON t1.uploader = t2.uploader ) t3 ORDER BY views DESC LIMIT 20;
10.4.8 统计每个类别视频观看数Top10
思路:
1) 先得到categoryId展开的表数据
2) 子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列
3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。
最终代码:
select t1.* from ( select videoId, categoryId, views, row_number() over(partition by categoryId order by views desc) rank from gulivideo_category) t1 where rank <= 10;
10.4.9.统计视频观看数Top20所属类别
9.统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数 select category_name as category, count(t2.videoId) as hot_with_views from ( select videoId, category_name from ( select * from gulivideo_orc order by views desc limit 20) t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc;