Hadoop the definite guide 读书笔记(五)
MapReduce的类型和格式
MapReduce类型
hadoop里的MapReduce有如下通用的格式:
map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)
通常情况下,输入的key和value的类型(K1和V1)和输出的类型(K2和V2)是不同的,当然,Reduce的输入和Map的输出类型必须是一样的,虽然Reduce的输出可能是另外一种类型(K3和V3)。Java接口定义:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
}
如果使用combine函数的话,它同reduce函数的形式是一样的(同时也实现Reducer接口),不同之处在于输出的类型是一些中间的key和value:
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)
partitation函数处理中间的key和value的类型,并且返回其partation index:
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}
Input Splits 和 Records
Input会被分成input split,split由record 组成。map处理每一个record,并且返回key和value的对。
InputSplit由InputSplit接口来定义:
public interface InputSplit extends Writable {
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
MapReduce程序作者并不需要直接处理InputSplit,因为它是由InputFormat创建的:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
InputFormat类结构图:

OutputFormat
OutputFormat和InputFormat的类型相对应



没有评论▼