凡是过往,皆为序章

0%

大数据_05(MapReduce基础和入门案例(单词统计))

本篇总结MapReduce的入门基础知识。


MapReduce介绍

MapReduce的核心思想是“分而治之”,适用于大量复杂的任务处理场景。

  • Map负责“分”,即把复杂的任务分解为若干“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

  • Reduce负责“合”,即对Map阶段的结果进行全局汇总。

  • MapReduce运行在Yarn集群,是一个“主从” 结构。

    1. ResourceManager
    2. NodeManager

    这两个阶段合起来正是MapReduce思想的体现。

MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

​ MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:

​ Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。

  • Map: (k1; v1) → [(k2; v2)]
  • Reduce: (k2; [v2]) → [(k3; v3)]

一个完整的mapreduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster 负责整个程序的过程调度及状态协调
  2. MapTask 负责map阶段的整个数据处理流程
  3. ReduceTask 负责reduce阶段的整个数据处理流程

MapReduce编程规范

MapReduce的开发一共有8个步骤:

Map阶段2个步骤

  • 设置InputFormat类,将数据切分为 key-value(K1V1)对,输入到第二步
  • 自定义Map逻辑,将第一步的结果转换成另一种的key-value(K2V2)对,输出结果

( 1 )用户自定义的Mapper要继承自己的父类

( 2 ) Mapper的输入数据是KV对的形式(KV的类型可自定义)

( 3 ) Mapper中的业务逻辑写在map()方法中

( 4 ) Mapper的输出数据是KV对的形式(KV的类型可自定义)

( 5 ) map()方法(MapTask进程)对每一个<K,V>调用一次

Shuffle阶段4个步骤

  • 对输出的key-value对进行分区
  • 对不同分区的数据按照相同的Key排序
  • (可选)对分组过的数据初步规约,降低数据的网络拷贝
  • 对数据进行分组,相同Key的Value放入一个集合中

Reduce阶段2个步骤

  • 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value(K3 和 V3)输出
  • 设置OutputFormat处理并保存Reduce输出的 Key-Value数据

( 1 )用户自定义的Reducer要继承自己的父类
( 2 ) Reducer的输入数据类型对应Mapper的输出数据类型,也是KV( 3 )Reducer的业务逻辑写在reduce()方法中
( 4 ) ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法

Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

入门案例:单词统计

数据格式准备

创建一个新的文件

1
2
cd /export/servers
vim wordcount.txt

向其中放入以下内容并保存

1
2
3
4
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop

上传到 HDFS

1
2
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/

具体程序代码实现

WordCountMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
* 其中 Mapper类的四个泛型解释:
* KETIN:K1的类型
* VALUEIN:V1的类型
* KEYOUT:K2的类型
* VALUEOUT:V2的类型
*
* 使用已经给出定义好的类型,基本类型的封装,操作序列化起来更加方便
* <Long, String, Long, String> - > <LongWritable, Text, Text, LongWritable>
* */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/* map方法就是将 K1 和 V1 转换为 K2 和 V2
* 参数:
* key : K1 行偏移量
* value: V1 每一行的文本数据
* context : 上下文对象
*
* 如何将 K1 和 V1 转换为 K2 和 V2
* K1 V1
* 0 hello,world,hadoop
* 18 hdfs,hello,hadoop
* ---------------------------
* K2 V2
* hello 1
* world 1
* hadoop 1
* hdfs 1
* hello 1
* hadoop 1
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
// 将一行的文本数据进行拆分
String[] split = value.toString().split(",");
// 遍历数组,组装K2 和 V2
for (String word : split) {
text.set(word);
longWritable.set(1);
// 将 K2 和 V2 写入上下文
context.write(text, longWritable);
}
}
}

WordCountReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
* 四个泛型解释:
* KEYIN:K2类型
* VALUEIN:V2类型
* KEYOUT:K3类型
* VALUEOUT:V3类型
* */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/*
* reduce方法作用:将新的 K2 和 V2 转换为 K3 和 V3,并写入上下文
*
* 参数:
* key:新 K2
* values:集合 新V2
*
* 如何将新的 K2 和 V2 转换为 K3 和 V3
* 新 K2 V2
* world <1,1,1>
* hello <1,1>
* hadoop <1>
* ---------------------
* K3 V3
* hello 2
* world 3
* hadoop 1
* */
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
LongWritable longWritable = new LongWritable();
// 遍历values集合,将集合中的数字相加得到 V3
for (LongWritable value : values) {
count += value.get();
}
// 将 K3 和 V3 写入上下文中
longWritable.set(count);
context.write(key, longWritable);
}
}

JobMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool {
// 该方法用于指定一个Job任务
@Override
public int run(String[] strings) throws Exception {
// 创建一个job任务对象
// 第一个参数是一个configuration,下面的main方法调用时已经传入,存在Configured类中,通过getConf()方法获取
Job job = Job.getInstance(super.getConf(), "wordCount");

//job.setJarByClass(JobMain.class); // 如果打包出错,则需要该行代码

// 配置job任务对象 (8个步骤)
//1、指定文件的读取方式和读取路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://bigdata1:8020/wordcount"));
// 本地执行方法,执行JobMain主函数即可,提前准备好文件,该路径下的wordcount.txt
//TextInputFormat.addInputPath(job, new Path("file:///D:\\mapReduce\\input"));

//2、指定Map阶段的处理方式 和 数据类型
job.setMapperClass(WordCountMapper.class);
//设置Map阶段K2的类型
job.setMapOutputKeyClass(Text.class);
// 设置Map阶段 V2 的类型
job.setMapOutputValueClass(LongWritable.class);

// 第3,4,5,6采用默认的方式

// 7、指定Reduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//8、设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径,并判断该目录是否存在,存在则删除
Path path = new Path("hdfs://bigdata1:8020/wordCount_out");
TextOutputFormat.setOutputPath(job,path);
//TextOutputFormat.setOutputPath(job,new Path("file:///D:\\mapReduce\\output"));

// 判断目标目录是否存在
FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/"), new Configuration());
if(fileSystem.exists(path)){
fileSystem.delete(path, true);
}

// 等待任务结束
boolean bl = job.waitForCompletion(true);
return bl? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 启动 job 任务,实际就是调用上面的run方法
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}

运行方式

写好程序,有两种运行方式,一种是本地运行,在JobMain函数中注释的部分即是。然后运行主函数即可,这种方法一般用于本地的测试。

另一种是集群运行模式

  • 将MapReduce程序提交给Yarn集群,分发到很多的结点上并执行

  • 处理的数据和输出结果应该位于HDFS文件系统

  • 提交代码的实现步骤:将程序打成jar包并上传到虚拟机服务器上,使用hdfs命令执行

    1
    hadoop jar original-mapreduce-1.0-SNAPSHOT.jar(jar包名) JobMain(主函数名,函数右键copy reference)

    打成jar包方式:Maven项目里的Lifecycle下的package功能,得到的包位于target目录下,会有两个jar包,体积一大一小,本质并无不同,都可使用上传。

本程序将结果输出到HDFS的/wordCount_out目录下,可以 http://bigdata1:50070/explorer.html#/下查看。

~感谢你请我吃糖果~
-------------本文结束,感谢您的阅读,欢迎评论留言!-------------