
目录
1.需求
2.需求分析
3.代码
(1)在之前的序列化案例实 *** 的基础上,增加一个分区类
(2) 在driver类中增加自定义数据分区设置和ReduceTask设置
1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据:txt文件
(2)期望输出数据:
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2.需求分析在之前的序列化案例实 *** 上进行修改。
3.代码 (1)在之前的序列化案例实 *** 的基础上,增加一个分区类public class ProvincePartitioner extends Partitioner(2) 在driver类中增加自定义数据分区设置和ReduceTask设置{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { //test是手机号 String phone = text.toString(); //取前三位,substring(0,3)包含左边,不包含右边 String prePhone = phone.substring(0, 3); int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 4; }else{ partition = 5; } return partition; } }
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(6);
FileInputFormat.setInputPaths(job,new Path("D:\code\Hadoop\input\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\code\Hadoop\output55555"));
boolean result = job.waitForCompletion(true);
System.exit(result? 0:1);
}
}欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)