
在idea运行一个很普通的wordcount
所需环境,反正也不知道是不是这样,就都配了
没配之前一直报错
Could not locate executabl ....hadoop-2.7.3hadoop-2.7.3binwinutils.exe in the Hadoop binaries
配置完毕之后重启电脑就OK了.
下面是pom依赖
4.0.0 org.example MapReduce1.0-SNAPSHOT 8 8 org.apache.hadoop hadoop-client2.7.3 org.projectlombok lombok1.16.18
完整代码
package com.czx.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class WordCount {
public static class WordCountMapper extends Mapper{
Text keyOut = new Text();
LongWritable valueOut = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("mapper input==>" + key.get() + ", " + value.toString());
// 获取一行的字符串
// one world
String line = value.toString();
// one, world
String[] arr = line.split(" ");
for(String word : arr){
keyOut.set(word);
valueOut.set(1);
// 通过该方法输出数据 one,1
context.write(keyOut, valueOut);
System.out.println("mapper output==>" + keyOut.toString() + ", " + valueOut.get());
}
}
}
public static class WordCountReducer extends Reducer{
LongWritable valueOut = new LongWritable();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// one, [1,1,1,1] ---> one,4
long sum = 0L;
StringBuilder sb = new StringBuilder("reducer input==>");
sb.append(key).append(", [");
for(LongWritable w : values){
sb.append(w.get()).append(",");
sum += w.get();
}
sb.deleteCharAt(sb.length()-1).append("]");
System.out.println(sb.toString());
valueOut.set(sum);
context.write(key, valueOut);
System.out.println("reducer output==>" + key + ", " + sum);
}
}
// /tmp/mr/input /tmp/mr/output
public static void main(String[] args) throws Exception {
// 加载 core-default.xml 和 core-site.xml
Configuration conf = new Configuration();
// 创建运行mapreduce任务的Job对象
Job job = Job.getInstance(conf, "wordcount");
// 设置运行的类(linux 运行用)
job.setJarByClass(WordCount.class);
// 设置mapperclass
job.setMapperClass(WordCountMapper.class);
// 设置reducerclass
job.setReducerClass(WordCountReducer.class);
// 设置reducer个数, 不设置默认是1
job.setNumReduceTasks(1);
// 设置mapper输出keyclass
job.setMapOutputKeyClass(Text.class);
// 设置mapper输出valueclass
job.setMapOutputValueClass(LongWritable.class);
// 设置reducer输出keyclass
job.setOutputKeyClass(Text.class);
// 设置reducer输出的valueclass
job.setOutputValueClass(LongWritable.class);
// 设置读取的输入文件的inputformatclass,默认是文本,可以不设置
job.setInputFormatClass(TextInputFormat.class);
// 设置写入文件的outputformatclass,默认是文本,可以不设置
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入目录
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
// 设置输出目录
FileOutputFormat.setOutputPath(job, outputPath);
// 自动删除输出目录
FileSystem fs = FileSystem.get(conf);
// 如果输出目录存在,就递归删除输出目录
if(fs.exists(outputPath)){
// 递归删除输出目录
fs.delete(outputPath, true);
System.out.println("delete outputPath==> 【" + outputPath.toString() + "】 success!");
}
// 提交job
boolean status = job.waitForCompletion(false);
System.exit(status ? 0 : 1);
}
}
运行代码
设置参数
运行结果
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)