
目录
1.前情回顾
2.Map端表合并分析
2.1 将产品表缓存起来
2.2 在进行map之前加载缓存路径
2.3 打开文件,创建流对象
2.4 逐行读取产品表,并存放在字典中
2.5 关闭流
3. 完整代码
3.1 编写mapper程序
3.2 编写Driver程序
1.前情回顾
Hadoop案例:Mapreduce解决多个关联表整合问题(Redue Join)_小M呀~之大数据系列-CSDN博客在实际工作中可能会遇到这样的需求,将多个关联的表格整合到一张表中。https://blog.csdn.net/baidu_41833099/article/details/121745351上一篇文章是在Reduce做表合并,这种方式容易导致数据倾斜问题,因为当数据量很大的时候,多个MapTask数据全部汇总到Rdducer端处理会增大Reduer的负荷量,降低降低计算效率。因此,本文将在Map端进行多表合并。
2.Map端表合并思路分析将数据量比较小的表,缓存起来,在map之前获得缓存表。在map段进行合并,合并后就是我们想要的结果,此时并不需要进行reducer。因此,这里在Driver程序中将Reducer设置为0即可。
(注意:这种方式只适用于关联表中有小表的情形)
2.1 将产品表缓存起来//加载缓存路径
job.addCacheFile(new URI("file:///C:/ZProject/bigdata/input/tablecache/pd.txt"));
2.2 在进行map之前加载缓存路径
//获取缓存路径 URI[] cacheFiles = context.getCacheFiles();2.3 打开文件,创建流对象
//获取文件对象,并开流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));2.4 逐行读取产品表,并存放在字典中
//通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"UTF-8"));
//逐行读取按行处理
String line;
while (StringUtils.isNotEmpty(line=reader.readLine())){
//切割一行
//0 1 小米
String[] split = line.split("t");
pdMap.put(split[0],split[1]);
2.5 关闭流
//关闭流 IOUtils.closeStream(reader);3. 完整代码 3.1 编写mapper程序
package com.yangmin.mapreduce.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MapJoinMapper extends Mapper3.2 编写Driver程序{ private Map pdMap = new HashMap<>(); private Text text = new Text(); @Override //任务开始前将pd数据缓存进来 protected void setup(Context context) throws IOException { //获取缓存路径 URI[] cacheFiles = context.getCacheFiles(); //获取文件对象,并开流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); //通过包装流转换为reader,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"UTF-8")); //逐行读取按行处理 String line; while (StringUtils.isNotEmpty(line=reader.readLine())){ //切割一行 //0 1 小米 String[] split = line.split("t"); pdMap.put(split[0],split[1]); } IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("t"); //获取商品名称 String pName = pdMap.get(split[1]); //重新拼接 String s = split[0] +"t"+ pName +"t"+ split[2]; text.set(s); //写出 context.write(text, NullWritable.get()); } }
package com.yangmin.mapreduce.mapjoin;
import jdk.nashorn.internal.scripts.JO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//获取配置信息和Join
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//关联map
job.setMapperClass(MapJoinMapper.class);
//设置map的kv输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//设置最终kv输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//加载缓存路径
job.addCacheFile(new URI("file:///C:/ZProject/bigdata/input/tablecache/pd.txt"));
// Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0
job.setNumReduceTasks(0);
//设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("C:\ZProject\bigdata\input\inputtable2"));
FileOutputFormat.setOutputPath(job,new Path("C:\ZProject\bigdata\output\output_mapjoin"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)