MapReduce明星搜索指数统计,找出人气王

package com.buaa;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @ProjectName CountStarSearchIndex* @PackageName com.buaa* @ClassName SearchStarIndex* @Description 统计分别统计出男女明星最大搜索指数* @Author 刘吉超* @Date 2016-05-12 16:30:23*/public class SearchStarIndex extends Configured implements Tool {    // 分隔符\t    private static String TAB_SEPARATOR = "\t";    // 男    private static String MALE = "male";    // 女    private static String FEMALE = "female";        /*     * 解析明星数据     */    public static class IndexMapper extends Mapper<Object, Text, Text, Text> {        /*         * 每次调用map(LongWritable key, Text value, Context context)解析一行数据。         * 每行数据存储在value参数值中。然后根据'\t'分隔符,解析出明星姓名,性别和搜索指数         */        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {            // 将数据解析为数组            String[] tokens = value.toString().split(TAB_SEPARATOR);                        if(tokens != null && tokens.length >= 3){                // 性别                String gender = tokens[1].trim();                // 名称、关注指数                String nameHotIndex = tokens[0].trim() + TAB_SEPARATOR + tokens[2].trim();                                // 输出key=gender value=name+hotIndex                context.write(new Text(gender), new Text(nameHotIndex));            }        }    }        /*     * 根据性别对数据进行分区,将 Mapper的输出结果均匀分布在 reduce上     */    public static class IndexPartitioner extends Partitioner<Text, Text> {                 @Override        public int getPartition(Text key, Text value, int numReduceTasks) {             // 按性别分区            String sex = key.toString();                        // 默认指定分区 0            if(numReduceTasks == 0)                return 0;                        // 性别为男,选择分区0            if(MALE.equals(sex)){                           return 0;            }else if(FEMALE.equals(sex)){ // 性别为女,选择分区1                return 1 % numReduceTasks;            }else // 性别未知,选择分区2                return 2 % numReduceTasks;                   }    }        /*     * 定义Combiner,对 map端的输出结果,先进行一次合并,减少数据的网络输出     */    public static class IndexCombiner extends Reducer<Text, Text, Text, Text> {                @Override        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {            int maxHotIndex = Integer.MIN_VALUE;            String name= "";                        for (Text val : values) {                String[] valTokens = val.toString().split(TAB_SEPARATOR);                                int hotIndex = Integer.parseInt(valTokens[1]);                                if(hotIndex > maxHotIndex){                    name = valTokens[0];                    maxHotIndex = hotIndex;                }            }                        context.write(key, new Text(name + TAB_SEPARATOR + maxHotIndex));        }    }        /*     * 统计男、女明星最高搜索指数     */    public static class IndexReducer extends Reducer<Text, Text, Text, Text> {        /*         * 调用reduce(key, Iterable< Text> values, context)方法来处理每个key和values的集合。         * 我们在values集合中,计算出明星的最大搜索指数         */        @Override        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {            int maxHotIndex = Integer.MIN_VALUE;            String name = " ";                        // 根据key,迭代 values集合,求出最高搜索指数            for (Text val : values) {                String[] valTokens = val.toString().split(TAB_SEPARATOR);                                int hotIndex = Integer.parseInt(valTokens[1]);                                if (hotIndex > maxHotIndex) {                    name = valTokens[0];                    maxHotIndex = hotIndex;                }            }                        context.write(new Text(name), new Text(key + TAB_SEPARATOR + maxHotIndex));        }    }        @SuppressWarnings("deprecation")    @Override    public int run(String[] args) throws Exception {        // 读取配置文件        Configuration conf = new Configuration();                // 如果目标文件夹存在,则删除        Path mypath = new Path(args[1]);        FileSystem hdfs = mypath.getFileSystem(conf);        if (hdfs.isDirectory(mypath)) {            hdfs.delete(mypath, true);        }        // 新建一个任务        Job job = new Job(conf, "searchStarIndex");        // 主类        job.setJarByClass(SearchStarIndex.class);                // reduce的个数设置为2        job.setNumReduceTasks(2);        // 设置Partitioner类        job.setPartitionerClass(IndexPartitioner.class);                // Mapper        job.setMapperClass(IndexMapper.class);        // Reducer        job.setReducerClass(IndexReducer.class);                // map 输出key类型        job.setMapOutputKeyClass(Text.class);        // map 输出value类型        job.setMapOutputValueClass(Text.class);                // 设置Combiner类        job.setCombinerClass(IndexCombiner.class);                // 输出结果 key类型        job.setOutputKeyClass(Text.class);        // 输出结果 value类型        job.setOutputValueClass(Text.class);                // 输入路径        FileInputFormat.addInputPath(job, new Path(args[0]));        // 输出路径        FileOutputFormat.setOutputPath(job, new Path(args[1]));                // 提交任务        return job.waitForCompletion(true) ? 0 : 1;    }        public static void main(String[] args) throws Exception {        String[] args0 = {                 "hdfs://ljc:9000/buaa/index/index.txt",                "hdfs://ljc:9000/buaa/index/out/"        };        int ec = ToolRunner.run(new Configuration(), new SearchStarIndex(), args0);        System.exit(ec);    }}
(0)

相关推荐