本篇总结MapReduce的入门基础知识。
MapReduce介绍
MapReduce的核心思想是“分而治之”,适用于大量复杂的任务处理场景。
Map负责“分”,即把复杂的任务分解为若干“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对Map阶段的结果进行全局汇总。
MapReduce运行在Yarn集群,是一个“主从” 结构。
- ResourceManager
- NodeManager
这两个阶段合起来正是MapReduce思想的体现。
MapReduce设计构思
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。
- Map:
(k1; v1) → [(k2; v2)]
- Reduce:
(k2; [v2]) → [(k3; v3)]
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MRAppMaster
负责整个程序的过程调度及状态协调
MapTask
负责map阶段的整个数据处理流程
ReduceTask
负责reduce阶段的整个数据处理流程
MapReduce编程规范
MapReduce的开发一共有8个步骤:
Map阶段2个步骤
- 设置InputFormat类,将数据切分为 key-value(K1和V1)对,输入到第二步
- 自定义Map逻辑,将第一步的结果转换成另一种的key-value(K2和V2)对,输出结果
( 1 )用户自定义的Mapper要继承自己的父类
( 2 ) Mapper的输入数据是KV对的形式(KV的类型可自定义)
( 3 ) Mapper中的业务逻辑写在map()方法中
( 4 ) Mapper的输出数据是KV对的形式(KV的类型可自定义)
( 5 ) map()方法(MapTask进程)对每一个<K,V>调用一次
Shuffle阶段4个步骤
- 对输出的key-value对进行分区
- 对不同分区的数据按照相同的Key排序
- (可选)对分组过的数据初步规约,降低数据的网络拷贝
- 对数据进行分组,相同Key的Value放入一个集合中
Reduce阶段2个步骤
- 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value(K3 和 V3)输出
- 设置OutputFormat处理并保存Reduce输出的 Key-Value数据
( 1 )用户自定义的Reducer要继承自己的父类
( 2 ) Reducer的输入数据类型对应Mapper的输出数据类型,也是KV( 3 )Reducer的业务逻辑写在reduce()方法中
( 4 ) ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
入门案例:单词统计
数据格式准备
创建一个新的文件
1 2
| cd /export/servers vim wordcount.txt
|
向其中放入以下内容并保存
1 2 3 4
| hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
|
上传到 HDFS
1 2
| hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/
|
具体程序代码实现
WordCountMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text text = new Text(); LongWritable longWritable = new LongWritable(); String[] split = value.toString().split(","); for (String word : split) { text.set(word); longWritable.set(1); context.write(text, longWritable); } } }
|
WordCountReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; LongWritable longWritable = new LongWritable(); for (LongWritable value : values) { count += value.get(); } longWritable.set(count); context.write(key, longWritable); } }
|
JobMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), "wordCount");
job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://bigdata1:8020/wordcount"));
job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class); Path path = new Path("hdfs://bigdata1:8020/wordCount_out"); TextOutputFormat.setOutputPath(job,path);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/"), new Configuration()); if(fileSystem.exists(path)){ fileSystem.delete(path, true); }
boolean bl = job.waitForCompletion(true); return bl? 0 : 1; }
public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
|
运行方式
写好程序,有两种运行方式,一种是本地运行,在JobMain函数中注释的部分即是。然后运行主函数即可,这种方法一般用于本地的测试。
另一种是集群运行模式
本程序将结果输出到HDFS的/wordCount_out目录下,可以 http://bigdata1:50070/explorer.html#/
下查看。