Hadoop大数据案例之搜狗五百万数据分析
最近使用了hadoop中的hive、mapreduce以及HBASE对网上的一个搜狗五百万的数进行了一个比较实际的数据分析,适合新手去练习,好处是在接触较大的数据流的时候能碰到平时接触不到的问题,通过这些问题能够对自己有一个较好的提升,为以后接触到实际的大数据项目打一些有效的基础。
数据源:https://download.csdn.net/download/leoe_/10429414
数据说明:搜狗五百万数据,是经过处理后的搜狗搜索引擎生产数据,具有真实性,大数据性,能够较好的满足分布式计算应用开发课程设计的数据要求。
搜狗数据的格式: 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。
其中,用户ID是根据用户使浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应一个用户ID。
操作要求:
1.将原始数据加载到HDFS平台。
2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。
3.将处理后的数据加载到HDFS平台。
4.以下操作分别通过MR和Hive实现。
- 查询总条数
- 非空查询条数
- 无重复总条数
- 独立UID总数
- 查询频度排名(频度最高的前50词)
- 查询次数大于2次的用户总数
- 查询次数大于2次的用户占比
- Rank在10以内的点击次数占比
- 直接输入URL查询的比例
- 查询搜索过”仙剑奇侠传“的uid,并且次数大于3
5.将4每步骤生成的结果保存到HDFS中。
6.将5生成的文件通过Java API方式导入到HBase(一张表)。
7.通过HBase shell命令查询6导出的结果。
源数据处理
1.将原始数据加载到HDFS平台。
hadoop fs -put sogou.500w.utf8 /sogou/data
- 1
2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。
bash sogou-log-extend.sh sogou.500w.utf8 sogou.500w.utf8.ext
- 1
其中sogou-log-extend.sh
文件的内#!/bin/bash
#infile=/root/file/sogou.500w.utf8infile=$1#outfile=/root/filesogou.500w.utf8.finaloutfile=$2awk -F '\t' '{print $0"\t"substr($1,0,5)"year\t"substr($1,5,2)"month\t"substr($1,7,2)"day\t"substr($1,8,2)"hour"}' $infile > $outfile
- 1
- 2
- 3
- 4
- 5
3.处理后的数据加载到HDFS平台。
hadoop fs -put sogou.500w.utf8.ext /sogou/data
- 1
Hive分析数据
首先要先将数据导入到hive仓库中:
- 查看数据库:
show databases
- 创建数据库:
create database sogou
- 使用数据库:
use sogou
- 创建sougou表:
Create table sougou(time string,uuid string,name string,num1 int,num2 int,url string) Row format delimited fields terminated by '\t';
- 加载数据到表里:
Load data local inpath 'sogou.500w.utf8' into table sougou;
- 查看表信息:
desc sogou;
- 查询总条数
- HQL:
select count(*) from sougou
- HQL:
- 非空查询条数
- HQL:
select count(*) from sougou where name is not null and name !='';
- HQL:
- 无重复总条数
- HQL:
select count(*) from (select * from sougou group by time,num1,num2,uuid,name,url having count(*)=1) a;
- HQL:
- 独立UID总数
- HQL:
select count(distinct uuid) from sougou;
- HQL:
- 查询频度排名(频度最高的前50词)
- HQL:
select name,count(*) as pd from sougou group by name order by pd desc limit 50;
- HQL:
- 查询次数大于2次的用户总数
- HQL:
select count(a.uuid) from (select uuid,count(*) as cnt from sougou group by uuid having cnt > 2) a;
- HQL:
- Rank在10以内的点击次数占比
- HQL:
select count(*) from sougou where num1<11;
- HQL:
- 直接输入URL查询的比例
- HQL:
select count(*) from sougou where url like "http://v.ku6.com/show/ZZoLCC-NMTue3NQE.html";
- HQL:
- 查询搜索过”仙剑奇侠传“的uid,并且次数大于3
- HQL:
select uuid,count(*) as uu from sougou where name='仙剑奇侠传' group by uuid having uu>3;
- HQL:
MapReduce分析数据
(1)查询总条数
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MRCountAll { public static Integer i = 0; public static boolean flag = true; public static class CountAllMap extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { i++; } } public static void runcount(String Inputpath, String Outpath) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "count"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } job.setJarByClass(MRCountAll.class); job.setMapperClass(CountAllMap.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path(Inputpath)); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } FileOutputFormat.setOutputPath(job, new Path(Outpath)); try { job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception { runcount("/sogou/data/sogou.500w.utf8", "/sogou/data/CountAll"); System.out.println("总条数: " + i); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
(2)非空查询条数
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountNotNull { public static String Str = ""; public static int i = 0; public static boolean flag = true; public static class wyMap extends Mapper<Object, Text, Text, IntWritable> { @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\t"); if (!values[2].equals(null) && values[2] != "") { context.write(new Text(values[1]), new IntWritable(1)); i++; } } } public static void run(String inputPath, String outputPath) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotNull.class); job.setMapperClass(wyMap.class); //job.setReducerClass(wyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { FileInputFormat.addInputPath(job, new Path(inputPath)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { run("/sogou/data/sogou.500w.utf8", "/sogou/data/CountNotNull"); System.out.println("非空条数: " + i); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
(3)无重复总条数
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;import java.io.IOException;public class CountNotRepeat { public static int i = 0; public static class NotRepeatMap extends Mapper<Object , Text , Text, Text>{ @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { String text = value.toString(); String[] values = text.split("\t"); String time = values[0]; String uid = values[1]; String name = values[2]; String url = values[5]; context.write(new Text(time+uid+name+url), new Text("1")); } } public static class NotRepeatReduc extends Reducer<Text , IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException { i++; context.write(new Text(key.toString()),new IntWritable(i)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotRepeat.class); job.setMapperClass(NotRepeatMap.class); job.setReducerClass(NotRepeatReduc.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotRepeat")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("无重复总条数为: " + i); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
(4)独立UID总数
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountNotMoreUid { public static int i = 0; public static class UidMap extends Mapper<Object , Text , Text, Text>{ @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { String text = value.toString(); String[] values = text.split("\t"); String uid = values[1]; context.write(new Text(uid), new Text("1")); } } public static class UidReduc extends Reducer<Text , IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException { i++; context.write(new Text(key.toString()),new IntWritable(i)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "countnotnull"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } assert job != null; job.setJarByClass(CountNotNull.class); job.setMapperClass(UidMap.class); job.setReducerClass(UidReduc.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotMoreUid")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("独立UID条数: " + i); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
(5)查询频度排名(频度最高的前50词)
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.TreeMap;public class CountTop50 { public static class TopMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text text =new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] line= value.toString().split("\t"); String keys = line[2]; text.set(keys); context.write(text,new LongWritable(1)); } } public static class TopReducer extends Reducer< Text,LongWritable, Text, LongWritable>{ Text text = new Text(); TreeMap<Integer,String > map = new TreeMap<Integer,String>(); @Override protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException { int sum=0;//key出现次数 for (LongWritable ltext : value) { sum+=ltext.get(); } map.put(sum,key.toString()); //去前50条数据 if(map.size()>50){ map.remove(map.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Integer count:map.keySet()){ context.write(new Text(map.get(count)), new LongWritable(count)); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = Job.getInstance(conf, "count"); job.setJarByClass(CountTop50.class); job.setJobName("Five"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setMapperClass(TopMapper.class); job.setReducerClass(TopReducer.class); FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountTop50")); job.waitForCompletion(true); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
(6)查询次数大于2次的用户总数
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountQueriesGreater2 { public static int total = 0; public static class MyMaper extends Mapper<Object, Text, Text, IntWritable> { protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] str = value.toString().split("\t"); Text word; IntWritable one = new IntWritable(1); word = new Text(str[1]); context.write(word, one); } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException { // arg0是一个单词 arg1是对应的次数 int sum = 0; for (IntWritable i : arg1) { sum += i.get(); } if(sum>2){ total=total+1; } //arg2.write(arg0, new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); // 1.实例化一个Job Job job = Job.getInstance(conf, "six"); // 2.设置mapper类 job.setMapperClass(MyMaper.class); // 3.设置Combiner类 不是必须的 // job.setCombinerClass(MyReducer.class); // 4.设置Reducer类 job.setReducerClass(MyReducer.class); // 5.设置输出key的数据类型 job.setOutputKeyClass(Text.class); // 6.设置输出value的数据类型 job.setOutputValueClass(IntWritable.class); // 设置通过哪个类查找job的Jar包 job.setJarByClass(CountQueriesGreater2.class); // 7.设置输入路径 FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); // 8.设置输出路径 FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreater2")); // 9.执行该作业 job.waitForCompletion(true); System.out.println("查询次数大于2次的用户总数:" + total + "条"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
(7)查询次数大于2次的用户占比
package com.hadoop;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountQueriesGreaterPro { /* * * InputFormat ->FileInputFormat(子类)-> TextInputFormat(子类) * 程序当中默认调用的就是TextInputFormat 1.验证数据路径是否合法 2.TextInputFormat默认读取数据,就是一行一行读取的 * * 开始执行map,一个map就是一个task(是一个Java进程,运行在JVM上的) * map执行完毕后,会执行combiner(可选项),对一个map中的重复单词进行合并,value是一个map中出现的相同单词次数。 * shuffle(partitioner分区,进行不同map的值并按key排序) * Reducer接收所有map数据,并将具有相同key的value值合并 * * OutputFormat ->FileOutputFormat(子类)->TextOutputFormat(子类) 1.验证数据路径是否合法 * 2.TextOutputFormat写入文件格式是 key + "\t" + value + "\n" */ // Text是一个能够写入文件的Java String数据类型 // IntWritable是能够写入文件的int数据类型 // 头两个参数表示的是输入数据key和value的数据类型 // 后两个数据表示的就是输出数据key和value的数据类型 public static int total1 = 0; public static int total2 = 0; public static class MyMaper extends Mapper<Object, Text, Text, IntWritable> { @Override // key是行地址 // value是一行字符串 protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { total2++; String[] str = value.toString().split("\t"); Text word; IntWritable one = new IntWritable(1); word = new Text(str[1]); context.write(word, one); // 执行完毕后就是一个单词 对应一个value(1) } } // 头两个参数表示的是输入数据key和value的数据类型 // 后两个数据表示的就是输出数据key和value的数据类型 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException { // arg0是一个单词 arg1是对应的次数 int sum = 0; for (IntWritable i : arg1) { sum += i.get(); } if(sum>2){ total1++; } arg2.write(arg0, new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { System.out.println("seven begin"); Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); // 1.实例化一个Job Job job = Job.getInstance(conf, "seven"); // 2.设置mapper类 job.setMapperClass(MyMaper.class); // 3.设置Combiner类 不是必须的 // job.setCombinerClass(MyReducer.class); // 4.设置Reducer类 job.setReducerClass(MyReducer.class); // 5.设置输出key的数据类型 job.setOutputKeyClass(Text.class); // 6.设置输出value的数据类型 job.setOutputValueClass(IntWritable.class); // 设置通过哪个类查找job的Jar包 job.setJarByClass(CountQueriesGreaterPro.class); // 7.设置输入路径 FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); // 8.设置输出路径 FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreaterPro")); // 9.执行该作业 job.waitForCompletion(true); System.out.println("total1="+total1+"\ttotal2="+total2); float percentage = (float)total1/(float)total2; System.out.println("查询次数大于2次的用户占比为:" + percentage*100+"%"); System.out.println("over"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
(8)Rank在10以内的点击次数占比
package com.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountRank { public static int sum1 = 0; public static int sum2 = 0; public static class MyMapper extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { sum2++; String[] str = value.toString().split("\t"); int rank = Integer.parseInt(str[3]); if(rank<11) { sum1=sum1+1; } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = Job.getInstance(conf, "eight"); job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setJarByClass(CountRank.class); FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountRank")); job.waitForCompletion(true); System.out.println("sum1="+sum1+"\tsum2="+sum2); float percentage = (float)sum1/(float)sum2; System.out.println("Rank在10以内的点击次数占比:" +percentage*100+"%"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
(9)直接输入URL查询的比例
package com.hadoop;import java.io.IOException;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountURL { public static int sum1 = 0; public static int sum2 = 0; public static class MyMapper extends Mapper<Object, Text, Text, Text> { @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] str = value.toString().split("\t"); Pattern p = Pattern.compile("www"); Matcher matcher = p.matcher(str[2]); matcher.find(); try { if(matcher.group()!=null) sum1++; sum2++; } catch (Exception e) { sum2++; } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = Job.getInstance(conf, "nine"); job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setJarByClass(CountURL.class); FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountURL")); job.waitForCompletion(true); System.out.println("sum1="+sum1+"\tsum2="+sum2); float percentage = (float)sum1/(float)sum2; System.out.println("直接用url'%www%'查询的用户占比:" +percentage*100+"%"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
(10)查询搜索过”仙剑奇侠传“的uid,并且次数大于3
package com.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountUidGreater3 { public static String Str=""; public static int i=0; public static class Map extends Mapper<Object, Text, Text, IntWritable>{ @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String []values=value.toString().split("\t"); String pattern="仙剑奇侠传"; if(values[2].equals(pattern)){ context.write(new Text(values[1]), new IntWritable(1)); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable v:value){ sum=sum+v.get(); } if(sum>3){ Str=Str+key.toString()+"\n"; i++; } } } public static void main(String[] args) { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000"); Job job = null; try { job = Job.getInstance(conf, "count"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } job.setJarByClass(CountUidGreater3.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8")); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountUidGreater3")); job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("i: "+i); System.out.println(Str); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
5.将4每步骤生成的结果保存到HDFS中。
在第四步中的每个HQL前面加上:Insert overwrite directory ‘位置’
eg:Insert overwrite directory '/sogou/data/1' Select count(*) from sougou;
- 1
6.将5生成的文件通过Java API方式导入到HBase(一张表)。
package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import java.io.IOException;public class HBaseImport{ // reduce输出的表名 private static String tableName = "test"; // 初始化连接 static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.136.100:9000/hbase"); conf.set("hbase.master", "hdfs://192.168.136.100:60000"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "master,slave1,slave2"); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); } public static class BatchMapper extends Mapper<LongWritable, Text, LongWritable, Text> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); Text v2s = new Text(); v2s.set(line); context.write(key, v2s); } } public static class BatchReducer extends TableReducer<LongWritable, Text, NullWritable> { private String family = "info"; @Override protected void reduce( LongWritable arg0, Iterable<Text> v2s, Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) throws IOException, InterruptedException { for (Text v2 : v2s) { String[] splited = v2.toString().split("\t"); String rowKey = splited[0]; Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(), "raw".getBytes(), v2.toString() .getBytes()); context.write(NullWritable.get(), put); } } } public static void imputil(String str) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(conf, HBaseImport.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HBaseImport.class); FileInputFormat.setInputPaths(job,str); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(BatchMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(BatchReducer.class); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { String[] str={ "hdfs://192.168.136.100:9000/sogou/data/1/000000_0", "hdfs://192.168.136.100:9000/sogou/data/2/000000_0", "hdfs://192.168.136.100:9000/sogou/data/3/000000_0", "hdfs://192.168.136.100:9000/sogou/data/4/000000_0", "hdfs://192.168.136.100:9000/sogou/data/5/000000_0", "hdfs://192.168.136.100:9000/sogou/data/6/000000_0", "hdfs://192.168.136.100:9000/sogou/data/9/000000_0", "hdfs://192.168.136.100:9000/sogou/data/10/000000_0" }; for (String stri:str){ imputil(stri); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
7.通过HBase shell命令查询6导出的结果。
命令:scan ‘test’