1. MapReduce简介

1.1 什么是MapReduce

MapReduce是Google提出的一种分布式计算编程模型,用于处理大规模数据集。它将复杂的并行计算过程抽象成两个简单的操作:Map(映射)和Reduce(归约)。

1.2 MapReduce的核心思想

  • 分而治之:将大问题分解为小问题
  • 并行处理:多台机器同时处理数据
  • 容错性:自动处理节点故障
  • 可扩展性:支持动态添加计算节点

2. MapReduce编程模型

2.1 基本流程

输入数据 → Split → Map → Shuffle → Reduce → 输出结果

2.2 Map阶段

# Map函数示例
def map_function(key, value):
    # 处理输入的键值对
    words = value.split()
    for word in words:
        emit(word, 1)  # 输出中间键值对

Map阶段特点: - 接收输入键值对 - 产生中间键值对 - 并行执行 - 无状态操作

2.3 Reduce阶段

# Reduce函数示例
def reduce_function(key, values):
    # 聚合相同key的所有values
    count = sum(values)
    emit(key, count)  # 输出最终结果

Reduce阶段特点: - 接收中间键值对 - 按key分组处理 - 产生最终结果 - 可能有多个Reducer

2.4 Shuffle阶段

Map输出 → 分区 → 排序 → 分组 → Reduce输入

Shuffle过程: 1. 分区(Partition):将Map输出按key分配到不同Reducer 2. 排序(Sort):对中间结果按key排序 3. 分组(Group):将相同key的值聚合在一起 4. 传输(Transfer):将数据传输到Reducer节点

3. MapReduce架构

3.1 主从架构

JobTracker (Master)
├── TaskTracker 1 (Slave)
├── TaskTracker 2 (Slave)
├── TaskTracker 3 (Slave)
└── TaskTracker N (Slave)

3.2 核心组件

JobTracker(作业跟踪器)

  • 接收用户提交的作业
  • 将作业分解为Map和Reduce任务
  • 调度任务到TaskTracker
  • 监控任务执行状态
  • 处理任务失败和重试

TaskTracker(任务跟踪器)

  • 执行具体的Map和Reduce任务
  • 定期向JobTracker汇报状态
  • 管理本地数据和中间结果
  • 处理任务级别的故障恢复

3.3 数据流

HDFS输入 → InputFormat → RecordReader → Map → 
Partitioner → Sort → Combine → Reduce → 
OutputFormat → HDFS输出

4. 关键概念详解

4.1 InputFormat

public abstract class InputFormat<K, V> {
    // 获取输入分片
    public abstract List<InputSplit> getSplits(JobContext context);
    
    // 创建记录读取器
    public abstract RecordReader<K,V> createRecordReader(
        InputSplit split, TaskAttemptContext context);
}

常用InputFormat: - TextInputFormat:处理文本文件 - SequenceFileInputFormat:处理序列文件 - DBInputFormat:从数据库读取数据

4.2 OutputFormat

public abstract class OutputFormat<K, V> {
    // 创建记录写入器
    public abstract RecordWriter<K, V> getRecordWriter(
        TaskAttemptContext context);
    
    // 检查输出规范
    public abstract void checkOutputSpecs(JobContext context);
}

4.3 Partitioner

public abstract class Partitioner<KEY, VALUE> {
    // 确定key应该发送到哪个Reducer
    public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

4.4 Combiner

// Combiner是本地的mini-reducer
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

5. MapReduce执行流程

5.1 作业提交

// 1. 配置作业
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

// 2. 设置作业参数
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

// 3. 提交作业
job.waitForCompletion(true);

5.2 任务调度

1. JobTracker接收作业
2. 计算输入分片
3. 创建Map和Reduce任务
4. 将任务分配给TaskTracker
5. 监控任务执行

5.3 任务执行

Map任务执行:
1. 读取输入分片
2. 调用Map函数
3. 写入中间结果
4. 通知JobTracker完成

Reduce任务执行:
1. 从Map任务获取中间结果
2. 排序和分组
3. 调用Reduce函数
4. 写入最终结果

6. 容错机制

6.1 任务级容错

  • 任务重试:失败任务自动重新执行
  • 推测执行:对慢任务启动备份任务
  • 黑名单机制:标记故障节点

6.2 数据容错

  • 数据副本:HDFS提供数据冗余
  • 中间结果备份:Map输出可以备份
  • 检查点机制:保存关键状态信息

6.3 节点容错

# 故障检测和恢复
def handle_node_failure(failed_node):
    # 1. 检测节点故障
    if not heartbeat_received(failed_node):
        # 2. 标记节点为失效
        mark_node_failed(failed_node)
        
        # 3. 重新调度任务
        reschedule_tasks(failed_node.tasks)
        
        # 4. 更新任务状态
        update_task_status()

7. 性能优化

7.1 数据本地性

优先级顺序:
1. 数据本地性(Data Local)
2. 机架本地性(Rack Local)
3. 非本地性(Non Local)

7.2 合理设置参数

<!-- Map任务数量 -->
<property>
    <name>mapreduce.job.maps</name>
    <value>100</value>
</property>

<!-- Reduce任务数量 -->
<property>
    <name>mapreduce.job.reduces</name>
    <value>10</value>
</property>

<!-- JVM重用 -->
<property>
    <name>mapreduce.job.jvm.numtasks</name>
    <value>20</value>
</property>

7.3 使用Combiner

// 减少网络传输的数据量
job.setCombinerClass(IntSumReducer.class);

8. 典型应用场景

8.1 词频统计

# Map阶段
def map(line):
    for word in line.split():
        emit(word, 1)

# Reduce阶段
def reduce(word, counts):
    emit(word, sum(counts))

8.2 日志分析

# 分析Web访问日志
def map(log_line):
    ip, timestamp, url, status = parse_log(log_line)
    emit(ip, 1)  # 统计IP访问次数

def reduce(ip, counts):
    emit(ip, sum(counts))

8.3 数据清洗

# 清洗和转换数据
def map(raw_data):
    cleaned_data = clean_and_validate(raw_data)
    if cleaned_data:
        emit(generate_key(cleaned_data), cleaned_data)

9. MapReduce vs 其他计算模型

9.1 与传统并行计算对比

特性 MapReduce 传统并行计算
编程复杂度 简单 复杂
容错性 自动 手动
可扩展性 优秀 有限
数据本地性 支持 需手动优化

9.2 与Spark对比

特性 MapReduce Spark
内存使用 磁盘为主 内存为主
迭代计算 效率低 效率高
实时性 批处理 准实时
学习曲线 平缓 较陡

10. 最佳实践

10.1 设计原则

  • 保持Map和Reduce函数的简单性
  • 合理设计数据分区策略
  • 充分利用Combiner减少网络传输
  • 考虑数据倾斜问题

10.2 性能调优

  • 合理设置Map和Reduce任务数量
  • 优化输入输出格式
  • 使用压缩减少I/O开销
  • 监控和分析作业执行情况

10.3 常见陷阱

  • 避免在Map/Reduce函数中进行网络调用
  • 注意内存使用,防止OOM
  • 合理处理数据倾斜
  • 避免产生过多小文件

小结

本章介绍了MapReduce的基础概念、编程模型、系统架构和核心原理。MapReduce通过简单的编程接口隐藏了分布式计算的复杂性,为大数据处理提供了可靠的解决方案。

下一章将详细介绍Hadoop MapReduce的环境搭建和配置。