本篇总结MapReduce分区、排序、规约 的基础知识。
分区 在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理
例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等
其实就是相同类型的数据, 有共性的数据, 送到一起去处理
Reduce 当中默认的分区只有一个
Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package partition;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PartitionMapper extends Mapper <LongWritable , Text ,Text , NullWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { Counter counter = context.getCounter("MY_COUNTER" , "partition_counter" ); counter.increment(1L ); context.write(value,NullWritable.get()); } }
Reducer 1 2 3 4 5 6 public class PartitionerReducer extends Reducer <Text , NullWritable , Text ,NullWritable > { @Override protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
Partitioner 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package partition;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class MyPartitioner extends Partitioner <Text , NullWritable > { @Override public int getPartition (Text text, NullWritable nullWritable, int i) { String[] split = text.toString().split("\t" ); String numStr = split[5 ]; if (Integer.parseInt(numStr) > 15 ) return 1 ; else return 0 ; } }
JobMain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package partition;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "PartitionMapReduce" ); job.setInputFormatClass(TextInputFormat.class ) ; TextInputFormat.addInputPath(job,new Path("hdfs://bigdata1:8020/input" )); TextInputFormat.addInputPath(job,new Path("file:///D:\\input" )); job.setMapperClass(PartitionMapper.class ) ; job.setMapOutputKeyClass(Text.class ) ; job.setOutputValueClass(NullWritable.class ) ; job.setPartitionerClass(MyPartitioner.class ) ; job.setReducerClass(PartitionerReducer.class ) ; job.setOutputKeyClass(Text.class ) ; job.setOutputValueClass(NullWritable.class ) ; job.setNumReduceTasks(2 ); job.setOutputFormatClass(TextOutputFormat.class ) ; TextOutputFormat.setOutputPath(job, new Path("hdfs://bigdata1:8020/out/partition_out" )); TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\partition_out" )); return job.waitForCompletion(true ) ? 0 : 1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
计数器 其中在 mapper 和 reducer 里添加了计数器的相关使用,得到结果如下。
排序
序列化 (Serialization) 是指把结构化对象转化为字节流
反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化
Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
Writable 是 Hadoop 的序列化格式, Hadoop 定义了这样一个 Writable 接口. 一个类要支持可序列化只需实现这个接口即可
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能
数据格式如下
1 2 3 4 5 6 7 a 1 a 9 b 3 a 7 b 8 b 10 a 5
要求:
第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列
解决思路:
将 Map 端输出的 <key,value>
中的 key 和 value 组合成一个新的 key (newKey), value值不变
这里就变成 <(key,value),value>
, 在针对 newKey 排序的时候, 如果 key 相同, 就再对value进行排序
自定义类型和比较器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.cpz.mapreduce.sort; import com.sun.xml.internal.fastinfoset.algorithm.BuiltInEncodingAlgorithm;import org.apache.hadoop.io.WritableComparable; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; public class SortBean implements WritableComparable <SortBean > { private String word; private int num; @Override public String toString () { return word + "\t" + num ; } public String getWord () { return word; } public void setWord (String word) { this .word = word; } public int getNum () { return num; } public void setNum (int num) { this .num = num; } @Override public int compareTo (SortBean sortBean) { int result = this .word.compareTo(sortBean.word); if (result == 0 ){ return this .num - sortBean.num; } return result; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeUTF(word); dataOutput.writeInt(num); } @Override public void readFields (DataInput dataInput) throws IOException { this .word = dataInput.readUTF(); this .num = dataInput.readInt(); } }
Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SortMapper extends Mapper <LongWritable ,Text ,SortBean ,NullWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t" ); SortBean sortBean = new SortBean(); sortBean.setWord(split[0 ]); sortBean.setNum(Integer.parseInt(split[1 ])); context.write(sortBean,NullWritable.get()); } }
Reducer 1 2 3 4 5 6 public class SortReducer extends Reducer <SortBean ,NullWritable ,SortBean ,NullWritable > { @Override protected void reduce (SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
JobMain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class JobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "mapreduce_sort" ); job.setInputFormatClass(TextInputFormat.class ) ; TextInputFormat.setInputPaths(job,new Path("d:\\mapreduce\\sort_in" )); job.setMapperClass(SortMapper.class ) ; job.setMapOutputKeyClass(SortBean.class ) ; job.setMapOutputValueClass(NullWritable.class ) ; job.setReducerClass(SortReducer.class ) ; job.setOutputKeyClass(SortBean.class ) ; job.setOutputValueClass(NullWritable.class ) ; job.setOutputFormatClass(TextOutputFormat.class ) ; TextOutputFormat.setOutputPath(job,new Path("d:\\mapreduce\\sort_out" )); boolean bl = job.waitForCompletion(true ); return bl ? 0 : 1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
规约 概念:
每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
combiner 组件的父类就是 Reducer
combiner 和 reducer 的区别在于运行的位置
Combiner 是在每一个 maptask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤:
自定义一个 combiner 继承 Reducer,重写 reduce 方法(与之前步骤几乎一致)
在 job 中设置 job.setCombinerClass(CustomCombiner.class)
Combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 public class CombinerMapper extends Mapper <LongWritable ,Text ,Text ,LongWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text text = new Text(); LongWritable longWritable = new LongWritable(); String[] split = value.toString().split("," ); for (String s : split) { text.set(s); longWritable.set(1 ); context.write(text,longWritable); } } }
MyCombiner 1 2 3 4 5 6 7 8 9 10 public class MyCombiner extends Reducer <Text ,LongWritable ,Text ,LongWritable > { @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0 ; for (LongWritable value : values) { count += value.get(); } context.write(key,new LongWritable(count)); } }
Reducer 1 2 3 4 5 6 7 8 9 10 public class CombinerReducer extends Reducer <Text ,LongWritable ,Text ,LongWritable > { @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0 ; for (LongWritable value : values) { count += value.get(); } context.write(key,new LongWritable(count)); } }
JobMain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class JobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "mapreduce_combiner" ); job.setInputFormatClass(TextInputFormat.class ) ; TextInputFormat.setInputPaths(job,new Path("d:\\mapreduce\\combiner_in" )); job.setMapperClass(CombinerMapper.class ) ; job.setMapOutputKeyClass(Text.class ) ; job.setMapOutputValueClass(LongWritable.class ) ; job.setCombinerClass(MyCombiner.class ) ; job.setReducerClass(CombinerReducer.class ) ; job.setOutputKeyClass(Text.class ) ; job.setOutputValueClass(LongWritable.class ) ; job.setOutputFormatClass(TextOutputFormat.class ) ; TextOutputFormat.setOutputPath(job,new Path("d:\\mapreduce\\combiner_out" )); boolean bl = job.waitForCompletion(true ); return bl ? 0 : 1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }