MapReduce明星搜索指数统计,找出人气王
package com.buaa;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @ProjectName CountStarSearchIndex* @PackageName com.buaa* @ClassName SearchStarIndex* @Description 统计分别统计出男女明星最大搜索指数* @Author 刘吉超* @Date 2016-05-12 16:30:23*/public class SearchStarIndex extends Configured implements Tool { // 分隔符\t private static String TAB_SEPARATOR = "\t"; // 男 private static String MALE = "male"; // 女 private static String FEMALE = "female"; /* * 解析明星数据 */ public static class IndexMapper extends Mapper<Object, Text, Text, Text> { /* * 每次调用map(LongWritable key, Text value, Context context)解析一行数据。 * 每行数据存储在value参数值中。然后根据'\t'分隔符,解析出明星姓名,性别和搜索指数 */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 将数据解析为数组 String[] tokens = value.toString().split(TAB_SEPARATOR); if(tokens != null && tokens.length >= 3){ // 性别 String gender = tokens[1].trim(); // 名称、关注指数 String nameHotIndex = tokens[0].trim() + TAB_SEPARATOR + tokens[2].trim(); // 输出key=gender value=name+hotIndex context.write(new Text(gender), new Text(nameHotIndex)); } } } /* * 根据性别对数据进行分区,将 Mapper的输出结果均匀分布在 reduce上 */ public static class IndexPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { // 按性别分区 String sex = key.toString(); // 默认指定分区 0 if(numReduceTasks == 0) return 0; // 性别为男,选择分区0 if(MALE.equals(sex)){ return 0; }else if(FEMALE.equals(sex)){ // 性别为女,选择分区1 return 1 % numReduceTasks; }else // 性别未知,选择分区2 return 2 % numReduceTasks; } } /* * 定义Combiner,对 map端的输出结果,先进行一次合并,减少数据的网络输出 */ public static class IndexCombiner extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; String name= ""; for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); int hotIndex = Integer.parseInt(valTokens[1]); if(hotIndex > maxHotIndex){ name = valTokens[0]; maxHotIndex = hotIndex; } } context.write(key, new Text(name + TAB_SEPARATOR + maxHotIndex)); } } /* * 统计男、女明星最高搜索指数 */ public static class IndexReducer extends Reducer<Text, Text, Text, Text> { /* * 调用reduce(key, Iterable< Text> values, context)方法来处理每个key和values的集合。 * 我们在values集合中,计算出明星的最大搜索指数 */ @Override public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; String name = " "; // 根据key,迭代 values集合,求出最高搜索指数 for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); int hotIndex = Integer.parseInt(valTokens[1]); if (hotIndex > maxHotIndex) { name = valTokens[0]; maxHotIndex = hotIndex; } } context.write(new Text(name), new Text(key + TAB_SEPARATOR + maxHotIndex)); } } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { // 读取配置文件 Configuration conf = new Configuration(); // 如果目标文件夹存在,则删除 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 新建一个任务 Job job = new Job(conf, "searchStarIndex"); // 主类 job.setJarByClass(SearchStarIndex.class); // reduce的个数设置为2 job.setNumReduceTasks(2); // 设置Partitioner类 job.setPartitionerClass(IndexPartitioner.class); // Mapper job.setMapperClass(IndexMapper.class); // Reducer job.setReducerClass(IndexReducer.class); // map 输出key类型 job.setMapOutputKeyClass(Text.class); // map 输出value类型 job.setMapOutputValueClass(Text.class); // 设置Combiner类 job.setCombinerClass(IndexCombiner.class); // 输出结果 key类型 job.setOutputKeyClass(Text.class); // 输出结果 value类型 job.setOutputValueClass(Text.class); // 输入路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://ljc:9000/buaa/index/index.txt", "hdfs://ljc:9000/buaa/index/out/" }; int ec = ToolRunner.run(new Configuration(), new SearchStarIndex(), args0); System.exit(ec); }}
赞 (0)