
package com.hongyaa.mr.mobile.member;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MemberLevelCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// (1)创建配置文件对象
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
// (2)新建一个 job 任务
Job job = Job.getInstance(conf);
// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
job.setJarByClass(MemberLevelCount.class);
// (4)指定 mapper 类和 reducer 类
job.setMapperClass(MemberLevelCountMapper.class);
job.setReducerClass(MemberLevelCountReducer.class);
// (5)指定 ReduceTask 的输出key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// (6)指定该 mapreduce 程序数据的输入和输出路径
Path inPath = new Path("/mobile/input");
Path outPath = new Path("/mobile/member_level");
// 获取 fs 对象
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
// (7)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
public static class MemberLevelCountMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
// (1)获取一行文本的内容
String line = value.toString();
// (2)根据分隔符“t”进行切分
String[] mobile = line.split("t");
// 会员等级
String memberLevel = mobile[3];
// (3)将会员等级作为key,将次数1作为value,分发给Reduce端
context.write(new Text(memberLevel), new IntWritable(1));
}
}
public static class MemberLevelCountReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context)
throws IOException, InterruptedException {
// 做每个会员等级的结果汇总
int sum = 0;
// (1)遍历values
for (IntWritable v : values) {
// 累积求和
sum += v.get();
}
// (2)将会员等级作为key,总数作为value,输出最终结果
context.write(key, new IntWritable(sum));
}
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)