技术,互联网,eLearning …

Hadoop the definite guide 读书笔记(五)

MapReduce的类型和格式

MapReduce类型

hadoop里的MapReduce有如下通用的格式:

map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

通常情况下,输入的key和value的类型(K1和V1)和输出的类型(K2和V2)是不同的,当然,Reduce的输入和Map的输出类型必须是一样的,虽然Reduce的输出可能是另外一种类型(K3和V3)。Java接口定义:

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
    void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
    throws IOException;
}

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
    void reduce(K2 key, Iterator<V2> values,
    OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
}

如果使用combine函数的话,它同reduce函数的形式是一样的(同时也实现Reducer接口),不同之处在于输出的类型是一些中间的key和value:

map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

partitation函数处理中间的key和value的类型,并且返回其partation index:

 public interface Partitioner<K2, V2> extends JobConfigurable {
   int getPartition(K2 key, V2 value, int numPartitions);
 }

Input Splits 和 Records

Input会被分成input split,split由record 组成。map处理每一个record,并且返回key和value的对。

InputSplit由InputSplit接口来定义:

public interface InputSplit extends Writable {
    long getLength() throws IOException;
    String[] getLocations() throws IOException;
}

MapReduce程序作者并不需要直接处理InputSplit,因为它是由InputFormat创建的:

public interface InputFormat<K, V> {
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    RecordReader<K, V> getRecordReader(InputSplit split,
    JobConf job,
    Reporter reporter) throws IOException;
}

InputFormat类结构图:

InputFormat类结构图

OutputFormat

OutputFormat和InputFormat的类型相对应

OutputFormat

Hadoop the definite guide 读书笔记(四)

MapReduce是如何工作的

MapReduce的整个工作过程如下图所示:

Hadoop的工作过程

整个模型的最上层有四个实体:

  • 客户端,负责提交MapReduce Job
  • jobtracker, 负责调节job运行,jobtracker是一个java应用程序,main class名为JobTracker
  • tasktrackers,job被分成多个split,tasktracker负责运行split,tasktracker是一个java应用程序,main class名为TaskTracker
  • HDFS,用于共享job所需的文件

Job提交

JobClient调用submitJob方法提交一个job,一旦Job被提交,runJob方法会每隔一秒检查一下job的进度,如果进度有变化则显示在控制台。job完成后,如果job成功,在控制台显示job计数器,失败则在控制台显示错误原因。

JobClient的submitJob方法会完成以下工作:

  • 从jobtracker那里拿到一个新job ID(通过调用JobTracker的getNewJobId()方法)
  • 检查job的输出目录是否正常,比如,如果没有指定输出目录或者输出目录已经存在,则不提交job并抛出异常
  • 计算job的input split,如果input split不能被计算出来,则抛出异常
  • 拷贝job运行所需要的必要资源到jobtracker的文件系统里以job ID命名的目录下,这些资源包括jar文件,配置文件和input splits
  • 告诉jobtracker可以运行job了

Job初始化

jobtracker接到submitJob的调用后,会将其放入一个内部的队列中,job scheduler会从队列里将job取出并初始化。

为了创建一个可以运行的task的队列,job scheduler首先从共享的文件系统中将input split取出来,并为每一个split创建一个map task,Reduce task的数据由mapred.reduce.tasks决定

Task分配

tasktracker定期向jobtracker发送heartbeat信息,做为heartbeat的一部分,tasktracker会表明其是否能可以接受新任务,如果可以,jobtracker通过heartbeat的返回值为其分配一个新的任务。

在为tasktracker分配task之前,jobtracker必须先选择一个job,有多种调度算法,简单的是保存一个job的优先级队列。一旦选择了一个job,jobtracker就选择一个task。

Task 执行

现在tasktracker已经被分配了一个task,下一步就是运行这个task。首先,从共享文件系统里拷贝job的jar文件至本地,其次,为task创建一个本地工作目录,把jar包解压至该目录下,最后,创建一个TaskRunner来运行这个task。

TaskRunner启动一个新的java虚拟机来运行每一个task。当然,也可以在不同的task之间共享JVM

进度和状态更新

MapReduce job是长期运行的批处理作业,因为运行时间很长,用户需要了解进度信息。Job以及其所属的task都有status信息,在整个job的运行过程中,status信息都会发生变化。

task运行时会记录其进度信息,并定期通过heartbeat向tasktracker报告。

Job完成

当jobtracker接收到信息,最后一个job已经完成,它会将job的状态设为”success”,这样,当JobClient查询job的状态时,它知道job已经成功完成,并通知用户,同时从runJob()方法中退出。

Hadoop the definite guide 读书笔记(三)

Hadoop I/O

Hadoop的I/O处理包括一些通用的技术,比如数据完整性和数据压缩,以及Hadoop专用的技术,比如序列化框架和磁盘上的数据结构。

数据完整性

磁盘读写和网络I/O会有很小的机会出现数据错误,当数据量很大的时候,这个概率就变得很大了。通常检测坏数据的办法是使用checksum,checksum本身的数据也可能出问题,不过概率要小得多,因为checksum的数据量相比传输的数据量要小很多。

HDFS会对写入的数据做checksum,在读取的时候难证checksum的正确性。每一个checksum的数据块的大小由io.bytes.per.checksum配置决定,缺省值是512。

数据压缩

数据压缩可以减少磁盘占用量和增加网络数据传输速度。Hadoop支持的压缩格式如下图所示:

Hadoop支持的压缩格式

数据序列化

序列化是将结构化数据转化为网络传输的数据流的过程,反序列化是将数据流转化回结构化数据的过程。

序列化主要应用于两个场景:进程间通讯和持久化存储。

Hadoop通过RPC实现两个节点间的通讯,RPC通过序列化机制将消息转化为二进制流传到另外的节点,该节点又会将二进制流还原回消息。

一个理想的RPC序列化的格式应该有如下特征:

  • 格式紧凑
  • 快速
  • 可扩展
  • 可在多语言间进行互操作

Hadoop使用Writable做序列化,它满足上面的前两个条件,但是不满足后两个条件。Writable所包含的类如下图所示:

Writable类

基于文件的数据结构

Hadoop的SequenceFile将数据存储为二进制的key-value对。SequenceFile也适合存储小文件,因为HDFS和MapReduce是针对小文件优化的,所以将小文件夹缩为SequenceFile会使得效率更高。

SequenceFile的格式:

Sequence File

Hadoop the definite guide 读书笔记(二)

MapReduce

MapReduce是一种用于数据处理的编程模型,这个模型很简单,但在海量数据处理方面却具备强大的能力。

MapReduce将数据处理过程分成两个阶段:Map阶段和Reduce阶段,每个阶段都是key-value对的方式进行输入和输出。key和value的类型由程序指定,程序同时需要指定map函数和reduce函数。

几个概念:

  • job: 是客户端程序想要完成的一系列工作的集合。包括输入数据,MapReduce程序和配置信息。
  • task: Hadoop将job分解为tasks,有两种类型的task: map task和reduce task
  • jobtracker和tasktracker: 这二者都是用来控制job执行的。tasktracker运行task,并向jobtracker报告进度信息,jobtracker记录下每一个job的进度信息,如果一个task失败,jobtracker会将其重新调度到另外的tasktracker上。

Hadoop将MapReduce job的输入分成固定大小的片,称为splits。Hadoop在每一个split上创建一个map task,这个map task会对该split里的每一条记录执行map操作。

Hadoop会尽量使task运行在一个节点上,这被称为本地数据优化(data locality optimization),这样会避免因过多网络通讯而引起的开销。

Map task会将输出存于本地硬盘,而不是放于HDFS上,因为Map的输出结果只是中间结果,它需要被reduce task所处理并产生最终结果。一旦任务完成,map task所产生的中间结果就可以删除了。

Reduce task不具备处理本地数据的优势:每一个reduce task的输入通常会来自所有的mappser。

当有多个Reducer的时候,map task会将其输出的数据切分为多个partition,每个partion上可能会有多个key,但是同一个key所对应的value必须在同一个partition上。

下图展示了mapreduce的数据流:

mapreduce 数据流

Map task和Reduce task之间会有一个shuffle的过程,因为一个reduce task的数据可能来自多个mapper task 。

Hadoop the definite guide 读书笔记(一)

HDFS

HDFS是Hadoop Distributed Filesystem的简称。

HDFS的设计服务以下目标:

  • 存储海量文件
  • 流式文件读取
  • 布署于廉价硬件之上

HDFS不适用于以下场景:

  • 低延迟数据读取
  • 非常多的小文件
  • 多个writer写入和任意文件修改

HDFS概念:

块(Blocks):缺省大小64M,远远大于磁盘文件系统”块“的大小,这样做的目的是最小化磁盘寻址时间。 在分布式文件系统里抽象出块的概念有以下好处:文件可以比单一磁盘要大;简化存储子系统,使其只负责管理基某个块;可以通过块复制来提高容错能力。 HDFS提供fsck命令对块进行检查:% hadoop fsck -files -blocks 。

Namenodes 和 Datanodes:HDFS集群包含两种类型的节点,namenode(master)和datanode(worker)。namenode维护着文件系统树及其上的目录和文件的元数据信息。datanode负责存取block,并且报告namenode它所存储的block信息。

文件读取过程:

客户端调用FileStream对象(DistributedFileSystem的一个实例)的open()方法(图中的第一步);DistributedFileSystem通过RPC调用namenode来获得所要读取文件的块信息;对于每一个块,namenode返回有这个块副本的datanode以及块在datanode上的副本,同时,这些datanode会根据其距离客户端的远近进行排序。如果客户端本身也是 datanode,那么会直接读取本地数据。

DistributedFileSystem返回客户端FSDataInputStream,用来读取数据;客户端完成数据读取后,调用FSDataInputStream的close()方法关闭数据流。在数据读取过程中,如果客户端从一个 datanode上读取出错,就会选择下一下离它最近的datanode。同时记住这个失败的datanode,读取后面的块的时候就不会选择这个 datanode。

文件写入过程:

客户端调用DistributedFileSystem的 create()方法来创建文件,DistributedFileSystem通过RPC调用namenode在文件系统的namespace里创建文件,此时并没有块与其相关联。namenode检查文件文件是否存在和客户端是否有创建文件的权限,如果成功,namenode添加一个新文件的记录,否则文件创建失败,客户端抛出IOException。

在客户端写数据的过程中,数据被分成不同的包,写入被称为data queue的队列里。DataQueue由负责向namenode申请块的DataStreamer控制。需要写入block的datanode形成一个 pipeline。如果replication level是3的话,pipeline里有三个节点,DataStreamer把数据流写入第一个datanode,第一个datanode存储包数据,并且将包转发到第二个datanode,类似的,第二个datanode也会存储包数据并转发至第三个节点。

返回顶部