mahout源码分析之Decision Forest

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

首先贴上调用TestForest的代码(win7下面myeclipse调用TestForest,这里要设置Configuration,所以不能直接TestForest.main()来调用):

 

[java][/java] view plaincopy

  1. package mahout.fansy.partial.test;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.mahout.classifier.df.mapreduce.TestForest;
  4. public class TestTestForest {
  5.     /**
  6.      * 测试TestForest
  7.      * @param args
  8.      * @throws Exception
  9.      */
  10.     public static void main(String[] args) throws Exception {
  11.         String[] arg=new String[]{“-i”,”hdfs://ubuntu:9000/user/breiman/input/glass.data”,
  12.                 “-ds”,”hdfs://ubuntu:9000/user/breiman/glass.info”,
  13.                 “-m”,”hdfs://ubuntu:9000/user/breiman/glass.tree/forest.seq”,
  14.                 “-a”,”-mr”,
  15.                 “-o”,”hdfs://ubuntu:9000/user/breiman/out-testforest0″};
  16.         Configuration conf=new Configuration();
  17.         conf.set(“mapred.job.tracker”, “ubuntu:9001”);
  18.     //  conf.set(“fs.default.name”, “hdfs://”);
  19.         conf.set(“fs.default.name”, “ubuntu:9000”);
  20.         TestForest tf=new TestForest();
  21.         tf.setConf(conf);
  22.         Configuration confq=tf.getConf();
  23.         System.out.println(confq);
  24.         tf.run(arg);
  25.     }
  26. }

跑出来的结果如下:

 

 

[java][/java] view plaincopy

  1. 13/09/25 00:14:51 INFO common.HadoopUtil: Deleting hdfs://ubuntu:9000/user/breiman/out-testforest0/mappers
  2. 13/09/25 00:14:51 INFO mapreduce.TestForest: =======================================================
  3. Summary
  4. ——————————————————-
  5. Correctly Classified Instances          :        208       97.1963%
  6. Incorrectly Classified Instances        :          6        2.8037%
  7. Total Classified Instances              :        214
  8. =======================================================
  9. Confusion Matrix
  10. ——————————————————-
  11. a       b       c       d       e       f       <–Classified as
  12. 15      0       2       0       0       0        |  17      a     = 3
  13. 0       76      0       0       0       0        |  76      b     = 2
  14. 0       2       68      0       0       0        |  70      c     = 1
  15. 0       0       1       28      0       0        |  29      d     = 7
  16. 0       0       0       0       9       0        |  9       e     = 6
  17. 0       0       0       1       0       12       |  13      f     = 5

可以看到mahout源码在Job任务运行完成后,直接把mapper的输出删去了,然后存入了一个文件(这个在源码中可以看出)。然后就是正确率了,可以看到正确率达到了97%,还行吧,毕竟是对原始数据的分类,这么高也是正常的。这个就不像上次分析的贝叶斯了,贝叶斯算法还有自动把数据分为两个部分的功能(一个训练,一个测试),这个算法没有。

 

下面看代码吧:

进入TestForest的run方法中,刚开始都是一些基本参数的设置。主要有:输入、输出(这个是最基本的了)、dataset路径、model路径(BuildForest的路径)、是否显示分析结果(就是上面的Summary部分)、是否采用mapreduce模式运行。

然后就进入testForest()方法了。进去后首先检查下output是否符合要求(就是是否存在,存在则抛出异常)。接着是model路径的判断,不存在抛出异常。最后才判断输入数据是否存在(汗,不是应该先判读输入数据是否存在的么?不过好像这三个都是要判断的,所以那个先那个后没关系吧)。

接着(本来我是打然后的,突然发现前面已经有然后了,所以就回退,打了个接着,汗,我居然把这句打出来了,好吧,好像又打多了)就是mapreduce()函数了。

这里先不说分析的内容,暂时只说Job的事情,Job的调用只有两句:

 

[java][/java] view plaincopy

  1. Classifier classifier = new Classifier(modelPath, dataPath, datasetPath, outputPath, getConf());
  2.     classifier.run();

一句新建Classifier,一句run方法。新建对象基本可以忽略了,看run方法:

 

 

[java][/java] view plaincopy

  1. DistributedCache.addCacheFile(datasetPath.toUri(), conf);
  2.     log.info(“Adding the decision forest to the DistributedCache”);
  3.     DistributedCache.addCacheFile(forestPath.toUri(), conf);
  4.     Job job = new Job(conf, “decision forest classifier”);
  5.     log.info(“Configuring the job…”);
  6.     configureJob(job);
  7.     log.info(“Running the job…”);
  8.     if (!job.waitForCompletion(true)) {
  9.       throw new IllegalStateException(“Job failed!”);
  10.     }

先分别把dataset和model的路径加入到内存中,方便Job的Mapper调用,然后configureJob,然后直接就跑job了job.waitForCompletion(true);。这里看下configureJob的内容:

 

 

[java][/java] view plaincopy

  1. job.setJarByClass(Classifier.class);
  2.     FileInputFormat.setInputPaths(job, inputPath);
  3.     FileOutputFormat.setOutputPath(job, mappersOutputPath);
  4.     job.setOutputKeyClass(DoubleWritable.class);
  5.     job.setOutputValueClass(Text.class);
  6.     job.setMapperClass(CMapper.class);
  7.     job.setNumReduceTasks(0); // no reducers
  8.     job.setInputFormatClass(CTextInputFormat.class);
  9.     job.setOutputFormatClass(SequenceFileOutputFormat.class);

看到基本是一些常规的设置,然后Mapper就是CMapper了,Reducer没有。看CMapper是怎么操作的:

 

setup函数主要代码就三行:

 

[java][/java] view plaincopy

  1. dataset = Dataset.load(conf, new Path(files[0].getPath()));
  2.      converter = new DataConverter(dataset);
  3.      forest = DecisionForest.load(conf, new Path(files[1].getPath()));

分别设置dataset、converter、forest,其实就是从路径中把文件读出来而已。

 

map函数:

 

[java][/java] view plaincopy

  1. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  2.       if (first) {
  3.         FileSplit split = (FileSplit) context.getInputSplit();
  4.         Path path = split.getPath(); // current split path
  5.         lvalue.set(path.getName());
  6.         lkey.set(key.get());
  7.         context.write(lkey, lvalue);
  8.         first = false;
  9.       }
  10.       String line = value.toString();
  11.       if (!line.isEmpty()) {
  12.         Instance instance = converter.convert(line);
  13.         double prediction = forest.classify(dataset, rng, instance);
  14.         lkey.set(dataset.getLabel(instance));
  15.         lvalue.set(Double.toString(prediction));
  16.         context.write(lkey, lvalue);
  17.       }
  18.     }

首先if里面的判断不知道是干啥的,这个应该要去看下输出文件才行(输出文件被源码删除了,但是这个不难搞到,只要在删除前设置断点即可。这个应该要下次分析了)。

 

然后判断输入是否为空,否则由converter把输入的一行转换为Instance变量,然后由setup函数中读出来的forest去分析这个Instance,看它应该是属于哪一类的,然后把key就设置为instance原来的分类,value设置为forest的分类结果(这里不明白干嘛还要把double转换为String,直接输入DoubleWritable的类型不就行了?可能是方便analyzer的分析吧)。这里最重要的操作其实就是forest.classify函数了:

这里先简要说下,下次再详细分析吧。前面得到的forest不是有很多棵树的嘛(这个可以自己设定的),然后每棵树都可以对这个Instance进行分析得到一个分类结果,然后取这些分类结果重复次数最多的那个即可。好了,眼睛要罢工了。。。

标签