Hadoop–两个简单的MapReduce程序

源代码下载:http://download.csdn.net/detail/huhui_bj/5909575

这周在学习Hadoop编程,以前看过《Hadoop权威指南》这本书,但是看完了HDFS这一章之后,后面的内容就难以再看懂了,说实话,之前一直对MapReduce程序敬而远之,毫不理解这种类型的程序的执行过程。这一周花了些时间看了Hadoop的实战,现在能够看懂简单的MapReduce程序,也能自己动手写几个简单的例子程序。下面是两个简单的MapReduce程序,用到了一些简单的Hadoop知识点,总结如下文。

例子一   求最大数

问题描述是这样的,从一系列数中,求出最大的那一个。这个需求应该说是很简单的,如果不用MapReduce来实现,普通的Java程序要实现这个需求,应该说是轻而易举的,几行代码就能搞定。这里用这个例子是想说说Hadoop中的Combiner的用法。

我们知道,Hadoop使用Mapper函数将数据处理成一个一个的<key, value>键值对,再在网络节点间对这些键值对进行整理(shuffle),然后使用Reducer函数处理这些键值对,并最终将结果输出。那么可以这样想,如果我们有1亿个数据(Hadoop就是为大数据而生),Mapper函数将会产生1亿个键值对在网络中进行传输,如果我们只是要求出这1亿个数当中的最大值,那么显然,Mapper只需要输出它所知道的最大值即可。这样一来可以减轻网络带宽的压力,二来,可以减轻Reducer的压力,提高程序的效率。

如果Reducer只是运行简单的诸如求最大值、最小值、计数,那么我们可以使用Combiner,但是,如果是求一组数的平均值,千万别用Combiner,道理很简单,你自己分析看。Combiner可以看作是Reducer的帮手,或者看成是Mapper端的Reducer,它能减少Mapper函数的输出从而减少网络数据传输并能减少Reducer上的负载。下面是Combiner的例子程序。

程序的输入是这样的:

 

[plain][/plain] view plaincopy

  1. 12
  2. 5
  3. 9
  4. 21
  5. 43
  6. 99
  7. 65
  8. 32
  9. 10

MapReduce程序需要找到这一组数字中的最大值99,Mapper函数是这样的:

 

 

[java][/java] view plaincopy

  1. public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
  2.     @Override
  3.     protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
  4.         // TODO Auto-generated method stub
  5.         context.write(new Text(), new IntWritable(Integer.parseInt(value.toString())));
  6.     }
  7. }

Mapper函数非常简单,它是负责读取HDFS中的数据的,负责将这些数据组成<key, value>对,然后传输给Reducer函数。Reducer函数如下:

 

 

[java][/java] view plaincopy

  1. public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  2.     @Override
  3.     protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
  4.         // TODO Auto-generated method stub
  5.         int temp = Integer.MIN_VALUE;
  6.         for(IntWritable value : values){
  7.             if(value.get() > temp){
  8.                 temp = value.get();
  9.             }
  10.         }
  11.         context.write(new Text(), new IntWritable(temp));
  12.     }
  13. }

Reducer函数也很简单,就是负责找到从Mapper端传来的数据中找到最大值。那么在Mapper函数与Reducer函数之间,有个Combiner,它的代码是这样的:

 

 

[java][/java] view plaincopy

  1. public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
  2.     @Override
  3.     protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
  4.         // TODO Auto-generated method stub
  5.         int temp = Integer.MIN_VALUE;
  6.         for(IntWritable value : values){
  7.             if(value.get() > temp){
  8.                 temp = value.get();
  9.             }
  10.         }
  11.         context.write(new Text(), new IntWritable(temp));
  12.     }
  13. }

我们可以看到,combiner也是继承了Reducer类,其写法与写reduce函数一样,reduce和combiner对外的功能是一样的,只是使用时的位置和上下文(Context)不一样而已。定义好了自己的Combiner函数之后,需要在Job类中加入一行代码,告诉Job你使用要在Mapper端使用Combiner:

 

 

[java][/java] view plaincopy

  1. job.setCombinerClass(MyCombiner.class);

那么这个求最大数的例子的Job类是这样的:

 

 

[java][/java] view plaincopy

  1. public class MyMaxNum {
  2.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  3.         Configuration conf = new Configuration();
  4.         Job job = new Job(conf,”My Max Num”);
  5.         job.setJarByClass(MyMaxNum.class);
  6.         job.setMapperClass(MyMapper.class);
  7.         job.setReducerClass(MyReducer.class);
  8.         job.setOutputKeyClass(Text.class);
  9.         job.setOutputValueClass(IntWritable.class);
  10.         job.setCombinerClass(MyCombiner.class);
  11.         FileInputFormat.addInputPath(job, new Path(“/huhui/nums.txt”));
  12.         FileOutputFormat.setOutputPath(job, new Path(“/output”));
  13.         System.exit(job.waitForCompletion(true) ? 0:1);
  14.     }
  15. }

 

当然你还可以对输出进行压缩。只要在函数中添加两行代码,就能对Reducer函数的输出结果进行压缩。当然这里没有必要对结果进行压缩,只是作为一个知识点而已。

 

[java][/java] view plaincopy

  1. //对输出进行压缩
  2. conf.setBoolean(“mapred.output.compress”, true);
  3. conf.setClass(“mapred.output.compression.codec”, GzipCodec.class, CompressionCodec.class);

 

例子二   自定义Key的类型

这个例子主要讲述如果自定义<key, value>的key的类型,以及如果如何使用Hadoop中的比较器WritableComparator和输入格式KeyValueTextInputFormat。
需求是这样的,给定下面一组输入:
[plain][/plain] view plaincopy

  1. str1    2
  2. str2    5
  3. str3    9
  4. str1    1
  5. str2    3
  6. str3    12
  7. str1    8
  8. str2    7
  9. str3    18

希望得到的输出如下:

[plain][/plain] view plaincopy

  1. str1    1,2,8
  2. str2    3,5,7
  3. str3    9,12,19
请注意,输入格式KeyValueTextInputFormat只能针对key和value中间使用制表符\t隔开的数据,而逗号是不行的。
对于这个需求,我们需要自定义一个key的数据类型。在Hadoop中,自定义的key值类型都要实现WritableComparable接口,然后重写这个接口的三个方法。这里我们定义IntPaire类,它实现了WritableComparable接口:
[java][/java] view plaincopy

  1. public class IntPaire implements WritableComparable<IntPaire> {
  2.     private String firstKey;
  3.     private int secondKey;
  4.     @Override
  5.     public void readFields(DataInput in) throws IOException {
  6.         // TODO Auto-generated method stub
  7.         firstKey = in.readUTF();
  8.         secondKey = in.readInt();
  9.     }
  10.     @Override
  11.     public void write(DataOutput out) throws IOException {
  12.         // TODO Auto-generated method stub
  13.         out.writeUTF(firstKey);
  14.         out.writeInt(secondKey);
  15.     }
  16.     @Override
  17.     public int compareTo(IntPaire o) {
  18.         // TODO Auto-generated method stub
  19.         return o.getFirstKey().compareTo(this.firstKey);
  20.     }
  21.     public String getFirstKey() {
  22.         return firstKey;
  23.     }
  24.     public void setFirstKey(String firstKey) {
  25.         this.firstKey = firstKey;
  26.     }
  27.     public int getSecondKey() {
  28.         return secondKey;
  29.     }
  30.     public void setSecondKey(int secondKey) {
  31.         this.secondKey = secondKey;
  32.     }
  33. }

上面重写的readFields方法和write方法,都是这样写的,几乎成为模板。

由于要将相同的key的键/值对送到同一个Reducer哪里,所以这里要用到Partitioner。在Hadoop中,将哪个key到分配到哪个Reducer的过程,是由Partitioner规定的,这是一个类,它只有一个抽象方法,继承这个类时要覆盖这个方法:
[java][/java] view plaincopy

  1. getPartition(KEY key, VALUE value, int numPartitions)

其中,第一个参数key和第二个参数value是Mapper端的输出<key, value>,第三个参数numPartitions表示的是当前Hadoop集群一共有多少个Reducer。输出则是分配的Reducer编号,就是指的是Mapper端输出的键对应到哪一个Reducer中去。我们一般实现Partitioner是哈希散列的方式,它以key的hash值对Reducer的数目取模,得到对应的Reducer编号。这样就能保证相同的key值,必定会分配到同一个reducer上。如果有N个Reducer,那么编号就是0,1,2,3……(N-1)。

那么在本例子中,Partitioner是这样实现的:
[java][/java] view plaincopy

  1. public class PartitionByText extends Partitioner<IntPaire, IntWritable> {
  2.     @Override
  3.     public int getPartition(IntPaire key, IntWritable value, int numPartitions) {//reduce的个数
  4.         // TODO Auto-generated method stub
  5.         return (key.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
  6.     }
  7. }

本例还用到了Hadoop的比较器WritableComparator,它实现的是RawComparator接口。

[java][/java] view plaincopy

  1. public class TextIntComparator extends WritableComparator {
  2.     public TextIntComparator(){
  3.         super(IntPaire.class,true);
  4.     }
  5.     @Override
  6.     public int compare(WritableComparable a, WritableComparable b) {
  7.         // TODO Auto-generated method stub
  8.         IntPaire o1 = (IntPaire) a;
  9.         IntPaire o2 = (IntPaire) b;
  10.         if(!o1.getFirstKey().equals(o2.getFirstKey())){
  11.             return o1.getFirstKey().compareTo(o2.getFirstKey());
  12.         }else{
  13.             return o1.getSecondKey() – o2.getSecondKey();
  14.         }
  15.     }
  16. }
由于我们在key中加入的额外的字段,所以在group的时候需要手工设置,手工设置很简单,因为job提供了相应的方法,在这里,我们的group比较器是这样实现的:
[java][/java] view plaincopy

  1. public class TextComparator extends WritableComparator {
  2.     public TextComparator(){
  3.         super(IntPaire.class,true);
  4.     }
  5.     @Override
  6.     public int compare(WritableComparable a, WritableComparable b) {
  7.         // TODO Auto-generated method stub
  8.         IntPaire o1 = (IntPaire) a;
  9.         IntPaire o2 = (IntPaire) b;
  10.         return o1.getFirstKey().compareTo(o2.getFirstKey());
  11.     }
  12. }

下面将写出Mapper函数,它是以KeyValueTextInputFormat的输入形式读取HDFS中的数据,设置输入格式将在job中。

[java][/java] view plaincopy

  1. public class SortMapper extends Mapper<Object, Text, IntPaire, IntWritable>{
  2.     public IntPaire intPaire = new IntPaire();
  3.     public IntWritable intWritable = new IntWritable(0);
  4.     @Override
  5.     protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
  6.         // TODO Auto-generated method stub
  7.         int intValue = Integer.parseInt(value.toString());
  8.         intPaire.setFirstKey(key.toString());
  9.         intPaire.setSecondKey(intValue);
  10.         intWritable.set(intValue);
  11.         context.write(intPaire, intWritable);//key:str1  value:5
  12.     }
  13. }

下面是Reducer函数,

[java][/java] view plaincopy

  1. public class SortReducer extends Reducer<IntPaire, IntWritable, Text, Text> {
  2.     @Override
  3.     protected void reduce(IntPaire key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
  4.         // TODO Auto-generated method stub
  5.         StringBuffer combineValue = new StringBuffer();
  6.         Iterator<IntWritable> itr = values.iterator();
  7.         while(itr.hasNext()){
  8.             int value = itr.next().get();
  9.             combineValue.append(value + “,”);
  10.         }
  11.         int length = combineValue.length();
  12.         String str = “”;
  13.         if(combineValue.length() > 0){
  14.             str = combineValue.substring(0, length-1);//去除最后一个逗号
  15.         }
  16.         context.write(new Text(key.getFirstKey()), new Text(str));
  17.     }
  18. }

Job类是这样的:

[java][/java] view plaincopy

  1. public class SortJob {
  2.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  3.         Configuration conf = new Configuration();
  4.         Job job = new Job(conf, “Sortint”);
  5.         job.setJarByClass(SortJob.class);
  6.         job.setMapperClass(SortMapper.class);
  7.         job.setReducerClass(SortReducer.class);
  8.         //设置输入格式
  9.         job.setInputFormatClass(KeyValueTextInputFormat.class);
  10.         //设置map的输出类型
  11.         job.setMapOutputKeyClass(IntPaire.class);
  12.         job.setMapOutputValueClass(IntWritable.class);
  13.         //设置排序
  14.         job.setSortComparatorClass(TextIntComparator.class);
  15.         //设置group
  16.         job.setGroupingComparatorClass(TextComparator.class);//以key进行grouping
  17.         job.setPartitionerClass(PartitionByText.class);
  18.         job.setOutputKeyClass(Text.class);
  19.         job.setOutputValueClass(Text.class);
  20.         FileInputFormat.addInputPath(job, new Path(“/huhui/input/words.txt”));
  21.         FileOutputFormat.setOutputPath(job, new Path(“/output”));
  22.         System.exit(job.waitForCompletion(true)?0:1);
  23.     }
  24. }

这样一来,程序就写完了,按照需求,完成了相应的功能。

后记

刚开始接触MapReduce程序可能会感到无从下手,这可能是因为你还没有理解MapReduce的机制和原理。自己动手写写简单的MapReduce函数会有助于理解,然后逐步的深入学习。

标签