Hadoop大数据案例之搜狗五百万数据分析

最近使用了hadoop中的hive、mapreduce以及HBASE对网上的一个搜狗五百万的数进行了一个比较实际的数据分析,适合新手去练习,好处是在接触较大的数据流的时候能碰到平时接触不到的问题,通过这些问题能够对自己有一个较好的提升,为以后接触到实际的大数据项目打一些有效的基础。


数据源https://download.csdn.net/download/leoe_/10429414
数据说明:搜狗五百万数据,是经过处理后的搜狗搜索引擎生产数据,具有真实性,大数据性,能够较好的满足分布式计算应用开发课程设计的数据要求。
搜狗数据的格式: 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL。
其中,用户ID是根据用户使浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应一个用户ID。


操作要求:

1.将原始数据加载到HDFS平台。
2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。
3.将处理后的数据加载到HDFS平台。
4.以下操作分别通过MR和Hive实现。

  • 查询总条数
  • 非空查询条数
  • 无重复总条数
  • 独立UID总数
  • 查询频度排名(频度最高的前50词)
  • 查询次数大于2次的用户总数
  • 查询次数大于2次的用户占比
  • Rank在10以内的点击次数占比
  • 直接输入URL查询的比例
  • 查询搜索过”仙剑奇侠传“的uid,并且次数大于3

5.将4每步骤生成的结果保存到HDFS中。
6.将5生成的文件通过Java API方式导入到HBase(一张表)。
7.通过HBase shell命令查询6导出的结果。


源数据处理

1.将原始数据加载到HDFS平台。

hadoop fs -put sogou.500w.utf8 /sogou/data
  • 1

2.将原始数据中的时间字段拆分并拼接,添加年、月、日、小时字段。

bash sogou-log-extend.sh sogou.500w.utf8 sogou.500w.utf8.ext
  • 1

其中sogou-log-extend.sh文件的内#!/bin/bash

#infile=/root/file/sogou.500w.utf8infile=$1#outfile=/root/filesogou.500w.utf8.finaloutfile=$2awk -F '\t' '{print $0"\t"substr($1,0,5)"year\t"substr($1,5,2)"month\t"substr($1,7,2)"day\t"substr($1,8,2)"hour"}' $infile > $outfile
  • 1
  • 2
  • 3
  • 4
  • 5

3.处理后的数据加载到HDFS平台。

hadoop fs -put sogou.500w.utf8.ext /sogou/data
  • 1

Hive分析数据

首先要先将数据导入到hive仓库中:

  1. 查看数据库:show databases
  2. 创建数据库: create database sogou
  3. 使用数据库: use sogou
  4. 创建sougou表:Create table sougou(time string,uuid string,name string,num1 int,num2 int,url string) Row format delimited fields terminated by '\t';
  5. 加载数据到表里:Load data local inpath 'sogou.500w.utf8' into table sougou;
  6. 查看表信息:desc sogou;

  • 查询总条数

    • HQL:select count(*) from sougou
  • 非空查询条数
    • HQL:select count(*) from sougou where name is not null and name !='';
  • 无重复总条数
    • HQL:select count(*) from (select * from sougou group by time,num1,num2,uuid,name,url having count(*)=1) a;
  • 独立UID总数
    • HQL:select count(distinct uuid) from sougou;
  • 查询频度排名(频度最高的前50词)
    • HQL:select name,count(*) as pd from sougou group by name order by pd desc limit 50;
  • 查询次数大于2次的用户总数
    • HQL:select count(a.uuid) from (select uuid,count(*) as cnt from sougou group by uuid having cnt > 2) a;
  • Rank在10以内的点击次数占比
    • HQL:select count(*) from sougou where num1<11;
  • 直接输入URL查询的比例
    • HQL:select count(*) from sougou where url like "http://v.ku6.com/show/ZZoLCC-NMTue3NQE.html";
  • 查询搜索过”仙剑奇侠传“的uid,并且次数大于3
    • HQL:select uuid,count(*) as uu from sougou where name='仙剑奇侠传' group by uuid having uu>3;

MapReduce分析数据

(1)查询总条数

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MRCountAll {    public static Integer i = 0;    public static boolean flag = true;    public static class CountAllMap extends Mapper<Object, Text, Text, Text> {        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)                throws IOException, InterruptedException {            i++;        }    }    public static void  runcount(String Inputpath, String Outpath) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = null;        try {            job = Job.getInstance(conf, "count");        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        job.setJarByClass(MRCountAll.class);        job.setMapperClass(CountAllMap.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        try {            FileInputFormat.addInputPath(job, new Path(Inputpath));        } catch (IllegalArgumentException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        FileOutputFormat.setOutputPath(job, new Path(Outpath));        try {            job.waitForCompletion(true);        } catch (ClassNotFoundException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }    public static void main(String[] args) throws Exception {        runcount("/sogou/data/sogou.500w.utf8", "/sogou/data/CountAll");        System.out.println("总条数:  " + i);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

(2)非空查询条数

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountNotNull {    public static String Str = "";    public static int i = 0;    public static boolean flag = true;    public static class wyMap extends Mapper<Object, Text, Text, IntWritable> {        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)                throws IOException, InterruptedException {            String[] values = value.toString().split("\t");            if (!values[2].equals(null) && values[2] != "") {                context.write(new Text(values[1]), new IntWritable(1));                i++;            }        }    }    public static void run(String inputPath, String outputPath) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = null;        try {            job = Job.getInstance(conf, "countnotnull");        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        assert job != null;        job.setJarByClass(CountNotNull.class);        job.setMapperClass(wyMap.class);        //job.setReducerClass(wyReduce.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        try {            FileInputFormat.addInputPath(job, new Path(inputPath));        } catch (IllegalArgumentException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }        try {            FileOutputFormat.setOutputPath(job, new Path(outputPath));            job.waitForCompletion(true);        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static void main(String[] args) {        run("/sogou/data/sogou.500w.utf8", "/sogou/data/CountNotNull");        System.out.println("非空条数:  " + i);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

(3)无重复总条数

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;import java.io.IOException;public class CountNotRepeat {    public static int i = 0;    public static class NotRepeatMap extends Mapper<Object , Text , Text, Text>{        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {            String text = value.toString();            String[] values = text.split("\t");            String time = values[0];            String uid = values[1];            String name = values[2];            String url = values[5];            context.write(new Text(time+uid+name+url), new Text("1"));        }    }    public static class NotRepeatReduc extends Reducer<Text , IntWritable, Text, IntWritable>{        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException {            i++;            context.write(new Text(key.toString()),new IntWritable(i));        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = null;        try {            job = Job.getInstance(conf, "countnotnull");        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        assert job != null;        job.setJarByClass(CountNotRepeat.class);        job.setMapperClass(NotRepeatMap.class);        job.setReducerClass(NotRepeatReduc.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        try {            FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        } catch (IllegalArgumentException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }        try {            FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotRepeat"));            job.waitForCompletion(true);        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("无重复总条数为:  " + i);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

(4)独立UID总数

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountNotMoreUid {    public static int i = 0;    public static class UidMap extends Mapper<Object , Text , Text, Text>{        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {            String text = value.toString();            String[] values = text.split("\t");            String uid = values[1];            context.write(new Text(uid), new Text("1"));        }    }    public static class UidReduc extends Reducer<Text , IntWritable, Text, IntWritable>{        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text,IntWritable,Text,IntWritable>.Context context) throws IOException, InterruptedException {            i++;            context.write(new Text(key.toString()),new IntWritable(i));        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = null;        try {            job = Job.getInstance(conf, "countnotnull");        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        assert job != null;        job.setJarByClass(CountNotNull.class);        job.setMapperClass(UidMap.class);        job.setReducerClass(UidReduc.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        try {            FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        } catch (IllegalArgumentException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }        try {            FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountNotMoreUid"));            job.waitForCompletion(true);        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("独立UID条数:  " + i);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

(5)查询频度排名(频度最高的前50词)

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.TreeMap;public class CountTop50 {    public static class TopMapper  extends Mapper<LongWritable, Text, Text, LongWritable>{        Text text =new Text();        @Override        protected void map(LongWritable key, Text value,Context context)                throws IOException, InterruptedException {            String[] line= value.toString().split("\t");            String keys = line[2];            text.set(keys);            context.write(text,new LongWritable(1));        }    }    public static class TopReducer extends Reducer< Text,LongWritable, Text, LongWritable>{        Text text = new Text();        TreeMap<Integer,String > map = new TreeMap<Integer,String>();        @Override        protected void reduce(Text key, Iterable<LongWritable> value, Context context)                throws IOException, InterruptedException {            int sum=0;//key出现次数            for (LongWritable ltext : value) {                sum+=ltext.get();            }            map.put(sum,key.toString());            //去前50条数据            if(map.size()>50){                map.remove(map.firstKey());            }        }        @Override        protected void cleanup(Context context)                throws IOException, InterruptedException {            for(Integer count:map.keySet()){                context.write(new Text(map.get(count)), new LongWritable(count));            }        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = Job.getInstance(conf, "count");        job.setJarByClass(CountTop50.class);        job.setJobName("Five");        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        job.setMapperClass(TopMapper.class);        job.setReducerClass(TopReducer.class);        FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountTop50"));        job.waitForCompletion(true);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

(6)查询次数大于2次的用户总数

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CountQueriesGreater2 {    public static int total = 0;    public static class MyMaper extends Mapper<Object, Text, Text, IntWritable> {        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)                throws IOException, InterruptedException {            String[] str = value.toString().split("\t");            Text word;            IntWritable one = new IntWritable(1);            word = new Text(str[1]);            context.write(word, one);        }    }    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        @Override        protected void reduce(Text arg0, Iterable<IntWritable> arg1,                              Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {            // arg0是一个单词 arg1是对应的次数            int sum = 0;            for (IntWritable i : arg1) {                sum += i.get();            }            if(sum>2){                total=total+1;            }            //arg2.write(arg0, new IntWritable(sum));        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        // 1.实例化一个Job        Job job = Job.getInstance(conf, "six");        // 2.设置mapper类        job.setMapperClass(MyMaper.class);        // 3.设置Combiner类 不是必须的        // job.setCombinerClass(MyReducer.class);        // 4.设置Reducer类        job.setReducerClass(MyReducer.class);        // 5.设置输出key的数据类型        job.setOutputKeyClass(Text.class);        // 6.设置输出value的数据类型        job.setOutputValueClass(IntWritable.class);        // 设置通过哪个类查找job的Jar包        job.setJarByClass(CountQueriesGreater2.class);        // 7.设置输入路径        FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        // 8.设置输出路径        FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreater2"));        // 9.执行该作业        job.waitForCompletion(true);        System.out.println("查询次数大于2次的用户总数:" + total + "条");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

(7)查询次数大于2次的用户占比

package com.hadoop;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountQueriesGreaterPro {    /*     *     * InputFormat ->FileInputFormat(子类)-> TextInputFormat(子类)     * 程序当中默认调用的就是TextInputFormat 1.验证数据路径是否合法 2.TextInputFormat默认读取数据,就是一行一行读取的     *     * 开始执行map,一个map就是一个task(是一个Java进程,运行在JVM上的)     * map执行完毕后,会执行combiner(可选项),对一个map中的重复单词进行合并,value是一个map中出现的相同单词次数。     * shuffle(partitioner分区,进行不同map的值并按key排序)     * Reducer接收所有map数据,并将具有相同key的value值合并     *     * OutputFormat ->FileOutputFormat(子类)->TextOutputFormat(子类) 1.验证数据路径是否合法     * 2.TextOutputFormat写入文件格式是 key + "\t" + value + "\n"     */    // Text是一个能够写入文件的Java String数据类型    // IntWritable是能够写入文件的int数据类型    // 头两个参数表示的是输入数据key和value的数据类型    // 后两个数据表示的就是输出数据key和value的数据类型    public static int total1 = 0;    public static int total2 = 0;    public static class MyMaper extends Mapper<Object, Text, Text, IntWritable> {        @Override        // key是行地址        // value是一行字符串        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)                throws IOException, InterruptedException {            total2++;            String[] str = value.toString().split("\t");            Text word;            IntWritable one = new IntWritable(1);            word = new Text(str[1]);            context.write(word, one);            // 执行完毕后就是一个单词 对应一个value(1)        }    }    // 头两个参数表示的是输入数据key和value的数据类型    // 后两个数据表示的就是输出数据key和value的数据类型    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        @Override        protected void reduce(Text arg0, Iterable<IntWritable> arg1,                              Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {            // arg0是一个单词 arg1是对应的次数            int sum = 0;            for (IntWritable i : arg1) {                sum += i.get();            }            if(sum>2){                total1++;            }            arg2.write(arg0, new IntWritable(sum));        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        System.out.println("seven begin");        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        // 1.实例化一个Job        Job job = Job.getInstance(conf, "seven");        // 2.设置mapper类        job.setMapperClass(MyMaper.class);        // 3.设置Combiner类 不是必须的        // job.setCombinerClass(MyReducer.class);        // 4.设置Reducer类        job.setReducerClass(MyReducer.class);        // 5.设置输出key的数据类型        job.setOutputKeyClass(Text.class);        // 6.设置输出value的数据类型        job.setOutputValueClass(IntWritable.class);        // 设置通过哪个类查找job的Jar包        job.setJarByClass(CountQueriesGreaterPro.class);        // 7.设置输入路径        FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        // 8.设置输出路径        FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountQueriesGreaterPro"));        // 9.执行该作业        job.waitForCompletion(true);        System.out.println("total1="+total1+"\ttotal2="+total2);        float percentage = (float)total1/(float)total2;        System.out.println("查询次数大于2次的用户占比为:" + percentage*100+"%");        System.out.println("over");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

(8)Rank在10以内的点击次数占比

package com.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountRank {    public static int sum1 = 0;    public static int sum2 = 0;    public static class MyMapper extends Mapper<Object, Text, Text, Text> {        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)                throws IOException, InterruptedException {            sum2++;            String[] str = value.toString().split("\t");            int rank = Integer.parseInt(str[3]);            if(rank<11)            {                sum1=sum1+1;            }        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = Job.getInstance(conf, "eight");        job.setMapperClass(MyMapper.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setJarByClass(CountRank.class);        FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountRank"));        job.waitForCompletion(true);        System.out.println("sum1="+sum1+"\tsum2="+sum2);        float percentage = (float)sum1/(float)sum2;        System.out.println("Rank在10以内的点击次数占比:" +percentage*100+"%");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

(9)直接输入URL查询的比例

package com.hadoop;import java.io.IOException;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountURL {    public static int sum1 = 0;    public static int sum2 = 0;    public static class MyMapper extends Mapper<Object, Text, Text, Text> {        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)                throws IOException, InterruptedException {            String[] str = value.toString().split("\t");            Pattern p = Pattern.compile("www");            Matcher matcher = p.matcher(str[2]);            matcher.find();            try {                if(matcher.group()!=null)                    sum1++;                sum2++;            } catch (Exception e) {                sum2++;            }        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = Job.getInstance(conf, "nine");        job.setMapperClass(MyMapper.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setJarByClass(CountURL.class);        FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountURL"));        job.waitForCompletion(true);        System.out.println("sum1="+sum1+"\tsum2="+sum2);        float percentage = (float)sum1/(float)sum2;        System.out.println("直接用url'%www%'查询的用户占比:" +percentage*100+"%");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

(10)查询搜索过”仙剑奇侠传“的uid,并且次数大于3

package com.hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CountUidGreater3 {    public static String Str="";    public static int i=0;    public static class Map extends Mapper<Object, Text, Text, IntWritable>{        @Override        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)                throws IOException, InterruptedException {            String []values=value.toString().split("\t");            String pattern="仙剑奇侠传";            if(values[2].equals(pattern)){                context.write(new Text(values[1]), new IntWritable(1));            }        }    }    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{        @Override        protected void reduce(Text key, Iterable<IntWritable> value,                              Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {            int sum=0;            for(IntWritable v:value){                sum=sum+v.get();            }            if(sum>3){                Str=Str+key.toString()+"\n";                i++;            }        }    }    public static void main(String[] args) {        Configuration conf=new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.136.100:9000");        Job job = null;        try {            job = Job.getInstance(conf, "count");        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        job.setJarByClass(CountUidGreater3.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        try {            FileInputFormat.addInputPath(job, new Path("/sogou/data/sogou.500w.utf8"));        } catch (IllegalArgumentException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        try {            FileOutputFormat.setOutputPath(job, new Path("/sogou/data/CountUidGreater3"));            job.waitForCompletion(true);        } catch (ClassNotFoundException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("i:  "+i);        System.out.println(Str);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96

5.将4每步骤生成的结果保存到HDFS中。
在第四步中的每个HQL前面加上:Insert overwrite directory ‘位置’

eg:Insert overwrite directory '/sogou/data/1' Select count(*) from sougou;
  • 1

6.将5生成的文件通过Java API方式导入到HBase(一张表)。

package com.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import java.io.IOException;public class HBaseImport{    // reduce输出的表名    private static String tableName = "test";    // 初始化连接    static Configuration conf = null;    static {        conf = HBaseConfiguration.create();        conf.set("hbase.rootdir", "hdfs://192.168.136.100:9000/hbase");        conf.set("hbase.master", "hdfs://192.168.136.100:60000");        conf.set("hbase.zookeeper.property.clientPort", "2181");        conf.set("hbase.zookeeper.quorum", "master,slave1,slave2");        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);    }    public static class BatchMapper extends            Mapper<LongWritable, Text, LongWritable, Text> {        protected void map(LongWritable key, Text value,                           Mapper<LongWritable, Text, LongWritable, Text>.Context context)                throws IOException, InterruptedException {            String line = value.toString();            Text v2s = new Text();            v2s.set(line);            context.write(key, v2s);        }    }    public static class BatchReducer extends            TableReducer<LongWritable, Text, NullWritable> {        private String family = "info";        @Override        protected void reduce(                LongWritable arg0,                Iterable<Text> v2s,                Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)                throws IOException, InterruptedException {            for (Text v2 : v2s) {                String[] splited = v2.toString().split("\t");                String rowKey = splited[0];                Put put = new Put(rowKey.getBytes());                put.add(family.getBytes(), "raw".getBytes(), v2.toString()                        .getBytes());                context.write(NullWritable.get(), put);            }        }    }    public static void imputil(String str) throws IOException, ClassNotFoundException,            InterruptedException {        Job job = Job.getInstance(conf, HBaseImport.class.getSimpleName());        TableMapReduceUtil.addDependencyJars(job);        job.setJarByClass(HBaseImport.class);        FileInputFormat.setInputPaths(job,str);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(BatchMapper.class);        job.setMapOutputKeyClass(LongWritable.class);        job.setMapOutputValueClass(Text.class);        job.setReducerClass(BatchReducer.class);        job.setOutputFormatClass(TableOutputFormat.class);        job.waitForCompletion(true);    }    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {        String[] str={                "hdfs://192.168.136.100:9000/sogou/data/1/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/2/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/3/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/4/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/5/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/6/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/9/000000_0",                "hdfs://192.168.136.100:9000/sogou/data/10/000000_0"                };        for (String stri:str){            imputil(stri);        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

7.通过HBase shell命令查询6导出的结果。
命令:scan ‘test’

(0)

相关推荐