作者:江南白衣
Hadoop 是 Google labs 的MapReduce的一个实现,Nutch项目的全部数据处理都构建在其之上。MapReduce是一种简化的分布式编程模式,让程序可以自动在普通机器组成的集群中以并行方式分布执行。
就如同java程序员可以不考虑内存泄露一样,MapReduce程序员也不许要关心海量数据如何被分配到多台机器上,不需要考虑机器失效的处理,不需要考虑这些机器间如何协作共同完成工作,程序员不需要什么并发处理或者分布式系统的经验,就可以进行分布式的编程。
MapReduce来源于函数式编程的Map,Reduce概念,Map是映射,Reduce是规约。说了非常简单,就真的是非常简单的,先看Hadoop自带的sample-WordCount ,再看Nutch里的Indexer,Fetcher两个实战的例子,最后阅读Hadoop wiki 上的HadoopMapReduce,很快就可以上手:
MapReduce过程简记:
1.根据输入路径,先用FileSplit把输入的文件剁碎,根据InputFormat(读入资料的格式)内含的RecordReader把资料读入成一组(key,value)对,然后按mapper count平均分给不同的Mapper处理。(这段因为没看过源码,还有点模糊)
2.Mapper进行Map操作 :: (InitialKey, IntialValue) -> [(InterKey, InterValue)] 从Inupt key,value 产生中间数据集。
3.Reducer进行Reduce操作:: (Interkey, InterValuesIterator) -> [(InterKey, InterValue)],Reducer遍历所有节点取得需要的中间数据集,再对其进行去重、过滤等后期处理,得到结果。
4.最后由OutputFormat类(输出资料的格式)内含的RecordWriter,将最终结果输出。结果输出可以是文件,也可以是其他形式,比如Nutch的Indexer,output时并不是去写文件,而是调用Lucene的IndexWriter将作为中间数据集的Lucene Document存盘。
一段典型的Hadoop代码:
public static void main(String[] args) main()
{
JobConf conf = new JobConf(WordCount. class );
conf.setJobName( " wordcount " );
jobConf.setInputFormat(MyInputFormat. class );
jobConf.setOutputFormat(MyOutputFormat. class );
jobConf.setInputKeyClass(BytesWritable. class );
jobConf.setInputValueClass(BytesWritable. class );
jobConf.setOutputKeyClass(BytesWritable. class );
jobConf.setOutputValueClass(BytesWritable. class );
jobConf.setMapperClass(MyMapper. class );
jobConf.setReducerClass(MyReducer. class );
jobConf.setNumMapTasks(num_maps);
jobConf.setNumReduceTasks(num_reduces);
JobClient.runJob(jobConf);
}
这就是一段hadoop式程序的主代码,用户自行实现四个内部类,MyInputFormat,MyOutputFormat,MyMapper和MyReducer,然后一一在jobConf里配置,最后执行runJob()即可。对比一下Nutch的源码就知道,只要把原来一段顺序执行的代码,拆分到上面四个类里面去,再在jobConf里进行指定,就可以拥有分布式执行的能力,无须要处理分布式编程的代码。配置一下服务器列表,在每台服务器上执行nutch/bin/start-all.sh,你的代码就分布式地执行了。
最后看一下Hadoop的结构,除了MapReduce的分发外,还有两个很重要的部分:
Hadoop的快速接口反射式RPC系统:IPC。
Hadoop的分布式文件系统NDFS:见 CSharpProgrammer blog上的文章。
注1:笔记中部分句子直接从网上文章摘录,不一一声明。
注2:Hadoop 目前版本为0.5,还不够成熟,看过源码的Donald对其源码评价一般,值得继续等待。