
环境准备,三台虚拟机上已经搭好hadoop和Zookeeper集群,配置好HDFS和MapReduce环境(环境看之前文章),本案例通过使用ava API实现好友推荐(好友推荐思路图如下)。
1、启动环境
//启动三台zookeeper zkServer.sh start //启动HDFS start-all.sh
启动访问主节点,创建文件加mrxx作为数据存储的位置
2、Java API具体实现代码
首先创建一个java项目,再通过工具类随机生成好友关系数据作为测试数据,在通过编写map和reduce的继承类重写方法实现具体 *** 作,最后通过Job类提交任务
① 引入依赖✧配置✧工具类
依赖pom.xml
UTF-8 1.8 1.8 3.1.2 2.4 org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-hdfs${hadoop.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-common${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-jobclient${hadoop.version} commons-io commons-io${commons-io.version} com.janeluo ikanalyzer2012_u6
配置文件:
工具类FriendRandomUtil
public class FriendRandomUtil {
public static void main(String[] args) throws IOException {
//读取学生信息
List studentList = FileUtils.readLines(new File(FriendRandomUtil.class.getResource("/students.txt").getPath()));
//创建好友列表映射关系
Map> friendMap = studentList.stream().collect(Collectors.toMap(e -> e, e -> new HashSet<>()));
//开始计算
for (String student : friendMap.keySet()) {
//使用蓄水池算法获取随机好友
List sampleList = FriendRandomUtil.reservoirSampling(studentList, new Random().nextInt(30) + 10);
//将list中选出的自己删除掉
sampleList.remove(student);
//将数据添加到set
friendMap.get(student).addAll(sampleList);
//同时将当前学生添加到对方的好友
for (String friend : sampleList) {
friendMap.get(friend).add(student);
}
}
//打印好友信息
for (String student : friendMap.keySet()) {
System.out.print(student + "t");
friendMap.get(student).stream().forEach(e -> System.out.print(e + "t"));
System.out.println();
}
}
public static List reservoirSampling(List studentList, int num) {
//定义数据的蓄水池
List sampleList = studentList.subList(0, num);
//开始进行抽样
for (int i = num; i < studentList.size(); i++) {
//从0-j中随机出一个数
int r = new Random().nextInt(i);
if (r < num) {
//如果随机出的r<水池大小 ,则进行替换
sampleList.set(r, studentList.get(i));
}
}
return sampleList;
}
}
执行工具类生成测试数据,并将测试数据上传至HDFS下的mrxx文件夹里,方便程序启动时获取数据源
② 重写map、reduce方法
重写map方法
public class FriendMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //对好友关系进行拆分 String[] friends = value.toString().split("\s"); //开始关联直接好友 for (int i = 1; i < friends.length; i++) { context.write(new Text(sortFriendName(friends[0], friends[i])), new IntWritable(0)); } //开始关联间接好友 for (int i = 1; i < friends.length; i++) { for (int j = i + 1; j < friends.length; j++) { context.write(new Text(sortFriendName(friends[i], friends[j])), new IntWritable(1)); } } } private String sortFriendName(String f1, String f2) { return f1.compareTo(f2) < 0 ? f1 + ":" + f2 : f2 + ":" + f1; } }
重写reduce
public class FriendReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //定义一个计数器 int count = 0; //获取迭代器 Iterator iterator = values.iterator(); while (iterator.hasNext()) { //获取推荐度 int recommendation = iterator.next().get(); //判断 if (recommendation == 0) { return; } else { count += recommendation; } } //将推荐度写出到HDFS context.write(key, new IntWritable(count)); } }
③ 编写作业类FriendsJob
主要通过此类来提交我们的作业,并配置好我们此次作业的配置和重写的map和reduce方法,启动提交即可
public class FriendsJob {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//声明配置
Configuration configuration=new Configuration();
//设置本地模式
configuration.set("mapreduce.framework.name","local");
//创建job
Job job=Job.getInstance(configuration);
//设置job
job.setJarByClass(FriendsJob.class);
job.setJobName("FriendsJob"+System.currentTimeMillis());
job.setNumReduceTasks(2);
//设置JOB的输入和输出路径
FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt"));
FileOutputFormat.setOutputPath(job, new Path("/mrxx/result/" + job.getJobName()));
//设置Job的Map和Reduce
job.setMapperClass(FriendMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(FriendReducer.class);
//提交任务
job.waitForCompletion(true);
}
}
测试结果:》》》
至此好友推荐完成,后面数字代表的推荐次数高。
3、好友推荐结果存放到Mysql
在原有基础上添加一个friends类引入mysql依赖,用navicat连接虚拟机上的mysql数据库,创建一个t_friends表作为存储
① Friends类✧依赖✧表结构
Friends类
public class Friend implements Writable, DBWritable {
private String id;
private String person;
private String friend;
private Integer count;
private Date createtime;
}//记得补全set get 和构造器 toString
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.id);
dataOutput.writeUTF(this.person);
dataOutput.writeUTF(this.friend);
dataOutput.writeInt(this.count);
dataOutput.writeLong(this.createtime.getTime());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.person = dataInput.readUTF();
this.friend = dataInput.readUTF();
this.count = dataInput.readInt();
this.createtime = new Date(dataInput.readLong());
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, this.id);
preparedStatement.setString(2, this.person);
preparedStatement.setString(3, this.friend);
preparedStatement.setInt(4, this.count);
preparedStatement.setTimestamp(5, new Timestamp(this.createtime.getTime()));
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getString(1);
this.person = resultSet.getString(2);
this.friend = resultSet.getString(3);
this.count = resultSet.getInt(4);
this.createtime = resultSet.getTimestamp(5);
}
t_friend表结构:
prom.xml
mysql
mysql-connector-java
5.1.32
② 修改FriendReducer和FriendsJob
FriendsReducer
public class FriendReducer extends Reducer{ private String jobName; @Override protected void setup(Context context) throws IOException, InterruptedException { jobName = context.getJobName(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //声明一个计数器记录推荐度 int count = 0; //获取迭代器 Iterator iterator = values.iterator(); //开始迭代 *** 作 for (IntWritable value : values) { if (value.get() == 0) { return; } else if (value.get() == 1) { count += value.get(); } } //创建一个Friend对象 String[] pf = key.toString().split(":"); Friend f1 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[0], pf[1], count, new Date()); Friend f2 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[1], pf[0], count, new Date()); //写出数据 context.write(f1, NullWritable.get()); context.write(f2, NullWritable.get()); } }
FriendsJob
public class FriendJob {
private static String driverClass = "com.mysql.jdbc.Driver";
private static String url = "jdbc:mysql://192.168.168.101:3306/mrxx?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true&useSSL=false";
private static String username = "root";
private static String password = "123456";
private static String tableName = "t_friend";
private static String[] fields = {"id", "person", "friend", "count", "createtime"};
public static void main(String[] args) throws Exception {
//加载配置文件
Configuration configuration = new Configuration(true);
configuration.set("mapreduce.framework.name", "local");
//开始载入数据库的配置文件
DBConfiguration.configureDB(configuration, driverClass, url, username, password);
//创建JOB
Job job = Job.getInstance(configuration);
//设置Job
job.setJarByClass(FriendJob.class);
job.setNumReduceTasks(1);
job.setJobName("FriendsJob" + System.currentTimeMillis());
//设置读取和写出的HDFS地址
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt"));
DBOutputFormat.setOutput(job, tableName, fields);
//设置Map和Reduce类以及传输的数据类型
job.setMapperClass(FriendMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(FriendReducer.class);
//提交任务
job.waitForCompletion(true);
}
}
这样就修改完毕启动提交任务即可,但前提是LInux内的mysql已经启动,可以先用Navicat是否能够连接上,启动程序结果就会存放到mysql数据库了
③ 启动测试结果
完结撒花~
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)