1. MapReduce简介
1.1 什么是MapReduce
MapReduce是Google提出的一种分布式计算编程模型,用于处理大规模数据集。它将复杂的并行计算过程抽象成两个简单的操作:Map(映射)和Reduce(归约)。
1.2 MapReduce的核心思想
- 分而治之:将大问题分解为小问题
- 并行处理:多台机器同时处理数据
- 容错性:自动处理节点故障
- 可扩展性:支持动态添加计算节点
2. MapReduce编程模型
2.1 基本流程
输入数据 → Split → Map → Shuffle → Reduce → 输出结果
2.2 Map阶段
# Map函数示例
def map_function(key, value):
# 处理输入的键值对
words = value.split()
for word in words:
emit(word, 1) # 输出中间键值对
Map阶段特点: - 接收输入键值对 - 产生中间键值对 - 并行执行 - 无状态操作
2.3 Reduce阶段
# Reduce函数示例
def reduce_function(key, values):
# 聚合相同key的所有values
count = sum(values)
emit(key, count) # 输出最终结果
Reduce阶段特点: - 接收中间键值对 - 按key分组处理 - 产生最终结果 - 可能有多个Reducer
2.4 Shuffle阶段
Map输出 → 分区 → 排序 → 分组 → Reduce输入
Shuffle过程: 1. 分区(Partition):将Map输出按key分配到不同Reducer 2. 排序(Sort):对中间结果按key排序 3. 分组(Group):将相同key的值聚合在一起 4. 传输(Transfer):将数据传输到Reducer节点
3. MapReduce架构
3.1 主从架构
JobTracker (Master)
├── TaskTracker 1 (Slave)
├── TaskTracker 2 (Slave)
├── TaskTracker 3 (Slave)
└── TaskTracker N (Slave)
3.2 核心组件
JobTracker(作业跟踪器)
- 接收用户提交的作业
- 将作业分解为Map和Reduce任务
- 调度任务到TaskTracker
- 监控任务执行状态
- 处理任务失败和重试
TaskTracker(任务跟踪器)
- 执行具体的Map和Reduce任务
- 定期向JobTracker汇报状态
- 管理本地数据和中间结果
- 处理任务级别的故障恢复
3.3 数据流
HDFS输入 → InputFormat → RecordReader → Map →
Partitioner → Sort → Combine → Reduce →
OutputFormat → HDFS输出
4. 关键概念详解
4.1 InputFormat
public abstract class InputFormat<K, V> {
// 获取输入分片
public abstract List<InputSplit> getSplits(JobContext context);
// 创建记录读取器
public abstract RecordReader<K,V> createRecordReader(
InputSplit split, TaskAttemptContext context);
}
常用InputFormat: - TextInputFormat:处理文本文件 - SequenceFileInputFormat:处理序列文件 - DBInputFormat:从数据库读取数据
4.2 OutputFormat
public abstract class OutputFormat<K, V> {
// 创建记录写入器
public abstract RecordWriter<K, V> getRecordWriter(
TaskAttemptContext context);
// 检查输出规范
public abstract void checkOutputSpecs(JobContext context);
}
4.3 Partitioner
public abstract class Partitioner<KEY, VALUE> {
// 确定key应该发送到哪个Reducer
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
4.4 Combiner
// Combiner是本地的mini-reducer
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
5. MapReduce执行流程
5.1 作业提交
// 1. 配置作业
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
// 2. 设置作业参数
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 3. 提交作业
job.waitForCompletion(true);
5.2 任务调度
1. JobTracker接收作业
2. 计算输入分片
3. 创建Map和Reduce任务
4. 将任务分配给TaskTracker
5. 监控任务执行
5.3 任务执行
Map任务执行:
1. 读取输入分片
2. 调用Map函数
3. 写入中间结果
4. 通知JobTracker完成
Reduce任务执行:
1. 从Map任务获取中间结果
2. 排序和分组
3. 调用Reduce函数
4. 写入最终结果
6. 容错机制
6.1 任务级容错
- 任务重试:失败任务自动重新执行
- 推测执行:对慢任务启动备份任务
- 黑名单机制:标记故障节点
6.2 数据容错
- 数据副本:HDFS提供数据冗余
- 中间结果备份:Map输出可以备份
- 检查点机制:保存关键状态信息
6.3 节点容错
# 故障检测和恢复
def handle_node_failure(failed_node):
# 1. 检测节点故障
if not heartbeat_received(failed_node):
# 2. 标记节点为失效
mark_node_failed(failed_node)
# 3. 重新调度任务
reschedule_tasks(failed_node.tasks)
# 4. 更新任务状态
update_task_status()
7. 性能优化
7.1 数据本地性
优先级顺序:
1. 数据本地性(Data Local)
2. 机架本地性(Rack Local)
3. 非本地性(Non Local)
7.2 合理设置参数
<!-- Map任务数量 -->
<property>
<name>mapreduce.job.maps</name>
<value>100</value>
</property>
<!-- Reduce任务数量 -->
<property>
<name>mapreduce.job.reduces</name>
<value>10</value>
</property>
<!-- JVM重用 -->
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>20</value>
</property>
7.3 使用Combiner
// 减少网络传输的数据量
job.setCombinerClass(IntSumReducer.class);
8. 典型应用场景
8.1 词频统计
# Map阶段
def map(line):
for word in line.split():
emit(word, 1)
# Reduce阶段
def reduce(word, counts):
emit(word, sum(counts))
8.2 日志分析
# 分析Web访问日志
def map(log_line):
ip, timestamp, url, status = parse_log(log_line)
emit(ip, 1) # 统计IP访问次数
def reduce(ip, counts):
emit(ip, sum(counts))
8.3 数据清洗
# 清洗和转换数据
def map(raw_data):
cleaned_data = clean_and_validate(raw_data)
if cleaned_data:
emit(generate_key(cleaned_data), cleaned_data)
9. MapReduce vs 其他计算模型
9.1 与传统并行计算对比
特性 | MapReduce | 传统并行计算 |
---|---|---|
编程复杂度 | 简单 | 复杂 |
容错性 | 自动 | 手动 |
可扩展性 | 优秀 | 有限 |
数据本地性 | 支持 | 需手动优化 |
9.2 与Spark对比
特性 | MapReduce | Spark |
---|---|---|
内存使用 | 磁盘为主 | 内存为主 |
迭代计算 | 效率低 | 效率高 |
实时性 | 批处理 | 准实时 |
学习曲线 | 平缓 | 较陡 |
10. 最佳实践
10.1 设计原则
- 保持Map和Reduce函数的简单性
- 合理设计数据分区策略
- 充分利用Combiner减少网络传输
- 考虑数据倾斜问题
10.2 性能调优
- 合理设置Map和Reduce任务数量
- 优化输入输出格式
- 使用压缩减少I/O开销
- 监控和分析作业执行情况
10.3 常见陷阱
- 避免在Map/Reduce函数中进行网络调用
- 注意内存使用,防止OOM
- 合理处理数据倾斜
- 避免产生过多小文件
小结
本章介绍了MapReduce的基础概念、编程模型、系统架构和核心原理。MapReduce通过简单的编程接口隐藏了分布式计算的复杂性,为大数据处理提供了可靠的解决方案。
下一章将详细介绍Hadoop MapReduce的环境搭建和配置。