4.1 MapReduce概述

4.1.1 MapReduce简介

MapReduce是Google提出的一种编程模型,用于大规模数据集的并行运算。Hadoop MapReduce是Apache Hadoop的核心组件之一,它将复杂的并行计算过程分解为Map(映射)和Reduce(归约)两个阶段。

核心思想

  1. 分而治之: 将大问题分解为小问题
  2. 并行处理: 多个节点同时处理数据
  3. 容错机制: 自动处理节点故障
  4. 数据本地性: 计算向数据靠拢
class MapReduceFramework:
    """
    MapReduce框架模拟器
    """
    
    def __init__(self):
        self.mappers = []
        self.reducers = []
        self.intermediate_data = {}
        self.final_results = {}
    
    def add_mapper(self, mapper_func):
        """
        添加Mapper函数
        
        Args:
            mapper_func: Mapper函数
        """
        self.mappers.append(mapper_func)
    
    def add_reducer(self, reducer_func):
        """
        添加Reducer函数
        
        Args:
            reducer_func: Reducer函数
        """
        self.reducers.append(reducer_func)
    
    def map_phase(self, input_data):
        """
        Map阶段处理
        
        Args:
            input_data: 输入数据
        """
        print("=== Map阶段开始 ===")
        
        for i, data_chunk in enumerate(input_data):
            print(f"Mapper {i+1} 处理数据: {data_chunk[:50]}...")
            
            # 执行mapper函数
            for mapper in self.mappers:
                mapper_output = mapper(data_chunk)
                
                # 存储中间结果
                for key, value in mapper_output:
                    if key not in self.intermediate_data:
                        self.intermediate_data[key] = []
                    self.intermediate_data[key].append(value)
        
        print(f"Map阶段完成,生成 {len(self.intermediate_data)} 个键值对")
        return self.intermediate_data
    
    def shuffle_and_sort_phase(self):
        """
        Shuffle和Sort阶段
        """
        print("\n=== Shuffle和Sort阶段开始 ===")
        
        # 对键进行排序
        sorted_keys = sorted(self.intermediate_data.keys())
        
        # 对每个键的值进行排序
        for key in sorted_keys:
            self.intermediate_data[key].sort()
        
        print(f"Shuffle和Sort完成,处理了 {len(sorted_keys)} 个键")
        return sorted_keys
    
    def reduce_phase(self, sorted_keys):
        """
        Reduce阶段处理
        
        Args:
            sorted_keys: 排序后的键列表
        """
        print("\n=== Reduce阶段开始 ===")
        
        for i, key in enumerate(sorted_keys):
            values = self.intermediate_data[key]
            print(f"Reducer {i+1} 处理键: {key}, 值数量: {len(values)}")
            
            # 执行reducer函数
            for reducer in self.reducers:
                result = reducer(key, values)
                self.final_results[key] = result
        
        print(f"Reduce阶段完成,生成 {len(self.final_results)} 个最终结果")
        return self.final_results
    
    def run_job(self, input_data):
        """
        运行MapReduce作业
        
        Args:
            input_data: 输入数据
            
        Returns:
            dict: 最终结果
        """
        print("MapReduce作业开始执行...\n")
        
        # Map阶段
        self.map_phase(input_data)
        
        # Shuffle和Sort阶段
        sorted_keys = self.shuffle_and_sort_phase()
        
        # Reduce阶段
        final_results = self.reduce_phase(sorted_keys)
        
        print("\nMapReduce作业执行完成!")
        return final_results
    
    def get_statistics(self):
        """
        获取执行统计信息
        
        Returns:
            dict: 统计信息
        """
        return {
            'mappers_count': len(self.mappers),
            'reducers_count': len(self.reducers),
            'intermediate_keys': len(self.intermediate_data),
            'final_results': len(self.final_results),
            'total_intermediate_values': sum(len(values) for values in self.intermediate_data.values())
        }

# 使用示例:词频统计
def word_count_mapper(text):
    """
    词频统计的Mapper函数
    
    Args:
        text: 输入文本
        
    Returns:
        List[Tuple]: 键值对列表
    """
    words = text.lower().split()
    return [(word, 1) for word in words if word.isalpha()]

def word_count_reducer(key, values):
    """
    词频统计的Reducer函数
    
    Args:
        key: 单词
        values: 计数列表
        
    Returns:
        int: 总计数
    """
    return sum(values)

if __name__ == "__main__":
    # 创建MapReduce框架实例
    mr_framework = MapReduceFramework()
    
    # 添加mapper和reducer
    mr_framework.add_mapper(word_count_mapper)
    mr_framework.add_reducer(word_count_reducer)
    
    # 准备测试数据
    input_texts = [
        "Hello world hello hadoop",
        "MapReduce is powerful",
        "Hadoop MapReduce framework",
        "Big data processing with MapReduce"
    ]
    
    # 运行MapReduce作业
    results = mr_framework.run_job(input_texts)
    
    print("\n=== 词频统计结果 ===")
    for word, count in sorted(results.items(), key=lambda x: x[1], reverse=True):
        print(f"{word}: {count}")
    
    # 显示统计信息
    stats = mr_framework.get_statistics()
    print("\n=== 执行统计 ===")
    for key, value in stats.items():
        print(f"{key}: {value}")

4.1.2 MapReduce优势

class MapReduceAdvantages:
    """
    MapReduce优势分析器
    """
    
    def __init__(self):
        self.advantages = {
            'scalability': self._analyze_scalability,
            'fault_tolerance': self._analyze_fault_tolerance,
            'simplicity': self._analyze_simplicity,
            'data_locality': self._analyze_data_locality,
            'cost_effectiveness': self._analyze_cost_effectiveness
        }
    
    def analyze_all_advantages(self) -> dict:
        """
        分析所有优势
        
        Returns:
            dict: 优势分析结果
        """
        analysis_results = {}
        
        for advantage_name, analysis_func in self.advantages.items():
            analysis_results[advantage_name] = analysis_func()
        
        return analysis_results
    
    def _analyze_scalability(self) -> dict:
        """
        分析可扩展性优势
        
        Returns:
            dict: 可扩展性分析
        """
        return {
            'title': '水平可扩展性',
            'description': 'MapReduce支持线性扩展,可以通过增加节点来提高处理能力',
            'benefits': [
                '支持从几台到数千台机器的扩展',
                '处理能力与节点数量成正比',
                '无需修改应用程序代码',
                '自动负载均衡'
            ],
            'example_scenario': {
                'problem': '处理1TB数据需要10小时',
                'solution': '增加10倍节点,处理时间减少到约1小时',
                'scaling_factor': '近似线性扩展'
            },
            'performance_model': {
                'single_node_time': '10 hours',
                'ten_nodes_time': '~1 hour',
                'hundred_nodes_time': '~6 minutes',
                'efficiency': '85-95% (考虑网络和协调开销)'
            }
        }
    
    def _analyze_fault_tolerance(self) -> dict:
        """
        分析容错性优势
        
        Returns:
            dict: 容错性分析
        """
        return {
            'title': '自动容错机制',
            'description': 'MapReduce具备强大的容错能力,能够自动处理节点故障',
            'mechanisms': [
                '任务重新调度:失败的任务自动在其他节点重新执行',
                '数据副本:HDFS提供数据冗余保护',
                '心跳检测:实时监控节点健康状态',
                '推测执行:对慢任务启动备份任务'
            ],
            'failure_scenarios': {
                'node_failure': {
                    'detection_time': '30-60秒',
                    'recovery_action': '任务重新分配到健康节点',
                    'data_loss': '无(通过HDFS副本恢复)'
                },
                'task_failure': {
                    'detection_time': '立即',
                    'recovery_action': '任务重启或重新调度',
                    'max_retry_attempts': '3次'
                },
                'network_partition': {
                    'detection_time': '心跳超时',
                    'recovery_action': '重新分配受影响的任务',
                    'consistency': '通过JobTracker协调保证'
                }
            },
            'reliability_metrics': {
                'mtbf': '数月到数年(取决于集群规模)',
                'availability': '99.9%+',
                'data_durability': '99.999999999%(11个9)'
            }
        }
    
    def _analyze_simplicity(self) -> dict:
        """
        分析简单性优势
        
        Returns:
            dict: 简单性分析
        """
        return {
            'title': '编程模型简单',
            'description': 'MapReduce提供了简单而强大的编程抽象',
            'simplicity_aspects': [
                '只需实现Map和Reduce函数',
                '框架处理所有并行化细节',
                '无需考虑分布式系统复杂性',
                '丰富的API和工具支持'
            ],
            'learning_curve': {
                'basic_concepts': '1-2天',
                'simple_applications': '1周',
                'advanced_optimization': '1-2个月',
                'expert_level': '6个月+'
            },
            'code_comparison': {
                'traditional_parallel': {
                    'lines_of_code': '500-1000行',
                    'complexity': '高(需要处理线程、同步、通信)',
                    'error_prone': '是(死锁、竞态条件等)'
                },
                'mapreduce': {
                    'lines_of_code': '50-100行',
                    'complexity': '低(只需关注业务逻辑)',
                    'error_prone': '否(框架处理并发)'
                }
            },
            'developer_productivity': {
                'development_time': '减少70-80%',
                'debugging_time': '减少60-70%',
                'maintenance_effort': '减少50-60%'
            }
        }
    
    def _analyze_data_locality(self) -> dict:
        """
        分析数据本地性优势
        
        Returns:
            dict: 数据本地性分析
        """
        return {
            'title': '数据本地性优化',
            'description': 'MapReduce优先在数据所在节点执行计算,减少网络传输',
            'locality_levels': [
                '节点本地性:任务在数据所在节点执行',
                '机架本地性:任务在同一机架的节点执行',
                '远程执行:跨机架执行(最后选择)'
            ],
            'performance_impact': {
                'node_local': {
                    'network_overhead': '0%',
                    'performance': '100%(基准)',
                    'preferred_ratio': '80-90%'
                },
                'rack_local': {
                    'network_overhead': '10-20%',
                    'performance': '85-90%',
                    'acceptable_ratio': '10-15%'
                },
                'remote': {
                    'network_overhead': '50-100%',
                    'performance': '50-70%',
                    'target_ratio': '<5%'
                }
            },
            'optimization_strategies': [
                '智能任务调度算法',
                'HDFS块放置策略',
                '机架感知配置',
                '数据预取和缓存'
            ],
            'bandwidth_savings': {
                'typical_cluster': '节省60-80%网络带宽',
                'large_cluster': '节省70-90%网络带宽',
                'cost_reduction': '显著降低网络设备成本'
            }
        }
    
    def _analyze_cost_effectiveness(self) -> dict:
        """
        分析成本效益优势
        
        Returns:
            dict: 成本效益分析
        """
        return {
            'title': '成本效益',
            'description': 'MapReduce提供了高性价比的大数据处理解决方案',
            'cost_advantages': [
                '使用商用硬件,降低硬件成本',
                '开源软件,无许可费用',
                '自动化运维,减少人力成本',
                '按需扩展,避免过度投资'
            ],
            'cost_comparison': {
                'traditional_supercomputer': {
                    'hardware_cost': '$1M - $10M',
                    'software_license': '$100K - $1M',
                    'maintenance': '$200K/年',
                    'scalability': '有限'
                },
                'mapreduce_cluster': {
                    'hardware_cost': '$100K - $1M(同等性能)',
                    'software_license': '$0(开源)',
                    'maintenance': '$50K/年',
                    'scalability': '无限'
                }
            },
            'tco_analysis': {
                'initial_investment': '降低50-70%',
                'operational_cost': '降低40-60%',
                'total_5_year_cost': '降低60-80%',
                'roi_period': '6-12个月'
            },
            'business_benefits': [
                '更快的数据洞察',
                '更好的决策支持',
                '新的商业机会',
                '竞争优势提升'
            ]
        }
    
    def generate_comparison_report(self) -> str:
        """
        生成优势对比报告
        
        Returns:
            str: 对比报告
        """
        analysis = self.analyze_all_advantages()
        
        report_lines = [
            "# MapReduce优势分析报告",
            f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
            "",
            "## 执行摘要",
            "",
            "MapReduce作为分布式计算框架,在以下方面具有显著优势:",
            ""
        ]
        
        # 添加各项优势分析
        for advantage_name, advantage_data in analysis.items():
            report_lines.extend([
                f"## {advantage_data['title']}",
                "",
                f"**描述**: {advantage_data['description']}",
                ""
            ])
            
            # 添加具体内容
            if 'benefits' in advantage_data:
                report_lines.append("**主要优势**:")
                for benefit in advantage_data['benefits']:
                    report_lines.append(f"- {benefit}")
                report_lines.append("")
            
            if 'mechanisms' in advantage_data:
                report_lines.append("**实现机制**:")
                for mechanism in advantage_data['mechanisms']:
                    report_lines.append(f"- {mechanism}")
                report_lines.append("")
            
            if 'cost_advantages' in advantage_data:
                report_lines.append("**成本优势**:")
                for cost_adv in advantage_data['cost_advantages']:
                    report_lines.append(f"- {cost_adv}")
                report_lines.append("")
            
            report_lines.append("---")
            report_lines.append("")
        
        # 添加总结
        report_lines.extend([
            "## 总结",
            "",
            "MapReduce通过其独特的设计理念和实现方式,为大数据处理提供了:",
            "",
            "1. **技术优势**: 可扩展、容错、简单易用",
            "2. **经济优势**: 低成本、高性价比",
            "3. **业务优势**: 快速洞察、竞争优势",
            "",
            "这些优势使得MapReduce成为大数据处理的首选框架之一。"
        ])
        
        return "\n".join(report_lines)

# 使用示例
if __name__ == "__main__":
    # 创建优势分析器
    analyzer = MapReduceAdvantages()
    
    # 分析所有优势
    advantages = analyzer.analyze_all_advantages()
    
    print("=== MapReduce优势分析 ===")
    
    for adv_name, adv_data in advantages.items():
        print(f"\n{adv_data['title']}:")
        print(f"  {adv_data['description']}")
        
        if 'benefits' in adv_data:
            print("  主要优势:")
            for benefit in adv_data['benefits'][:2]:  # 只显示前2个
                print(f"    - {benefit}")
    
    # 生成详细报告
    report = analyzer.generate_comparison_report()
    
    # 保存报告
    with open('mapreduce_advantages_report.md', 'w', encoding='utf-8') as f:
        f.write(report)
    
    print("\n详细优势分析报告已保存到 mapreduce_advantages_report.md")

4.2 MapReduce架构

4.2.1 架构组件

MapReduce采用主从架构,主要包含以下组件:

  1. JobTracker(作业跟踪器)

    • 负责作业调度和管理
    • 监控TaskTracker状态
    • 处理作业提交和任务分配
  2. TaskTracker(任务跟踪器)

    • 执行具体的Map和Reduce任务
    • 向JobTracker汇报任务状态
    • 管理本地任务执行
  3. Map Task(Map任务)

    • 处理输入数据分片
    • 执行用户定义的Map函数
    • 生成中间键值对
  4. Reduce Task(Reduce任务)

    • 处理Map任务的输出
    • 执行用户定义的Reduce函数
    • 生成最终结果
import threading
import time
import queue
from enum import Enum
from typing import List, Dict, Any, Callable, Tuple

class TaskStatus(Enum):
    """任务状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class JobStatus(Enum):
    """作业状态枚举"""
    SUBMITTED = "submitted"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class Task:
    """
    任务基类
    """
    
    def __init__(self, task_id: str, task_type: str, input_data: Any):
        self.task_id = task_id
        self.task_type = task_type
        self.input_data = input_data
        self.status = TaskStatus.PENDING
        self.start_time = None
        self.end_time = None
        self.output_data = None
        self.error_message = None
        self.assigned_tracker = None
    
    def start(self):
        """开始任务"""
        self.status = TaskStatus.RUNNING
        self.start_time = time.time()
    
    def complete(self, output_data: Any):
        """完成任务"""
        self.status = TaskStatus.COMPLETED
        self.end_time = time.time()
        self.output_data = output_data
    
    def fail(self, error_message: str):
        """任务失败"""
        self.status = TaskStatus.FAILED
        self.end_time = time.time()
        self.error_message = error_message
    
    def get_duration(self) -> float:
        """获取任务执行时间"""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return 0.0

class MapTask(Task):
    """
    Map任务
    """
    
    def __init__(self, task_id: str, input_data: Any, mapper_func: Callable):
        super().__init__(task_id, "map", input_data)
        self.mapper_func = mapper_func
    
    def execute(self) -> List[Tuple[Any, Any]]:
        """
        执行Map任务
        
        Returns:
            List[Tuple]: 键值对列表
        """
        try:
            self.start()
            result = self.mapper_func(self.input_data)
            self.complete(result)
            return result
        except Exception as e:
            self.fail(str(e))
            raise

class ReduceTask(Task):
    """
    Reduce任务
    """
    
    def __init__(self, task_id: str, key: Any, values: List[Any], reducer_func: Callable):
        super().__init__(task_id, "reduce", (key, values))
        self.key = key
        self.values = values
        self.reducer_func = reducer_func
    
    def execute(self) -> Tuple[Any, Any]:
        """
        执行Reduce任务
        
        Returns:
            Tuple: 键值对
        """
        try:
            self.start()
            result = self.reducer_func(self.key, self.values)
            self.complete((self.key, result))
            return (self.key, result)
        except Exception as e:
            self.fail(str(e))
            raise

class TaskTracker:
    """
    任务跟踪器
    """
    
    def __init__(self, tracker_id: str, max_tasks: int = 4):
        self.tracker_id = tracker_id
        self.max_tasks = max_tasks
        self.running_tasks = {}
        self.completed_tasks = []
        self.failed_tasks = []
        self.task_queue = queue.Queue()
        self.is_running = False
        self.worker_threads = []
        self.heartbeat_interval = 5  # 心跳间隔(秒)
        self.last_heartbeat = time.time()
        self.job_tracker = None
    
    def start(self):
        """启动TaskTracker"""
        self.is_running = True
        
        # 启动工作线程
        for i in range(self.max_tasks):
            thread = threading.Thread(target=self._worker_thread, args=(i,))
            thread.daemon = True
            thread.start()
            self.worker_threads.append(thread)
        
        # 启动心跳线程
        heartbeat_thread = threading.Thread(target=self._heartbeat_thread)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
        print(f"TaskTracker {self.tracker_id} 已启动")
    
    def stop(self):
        """停止TaskTracker"""
        self.is_running = False
        print(f"TaskTracker {self.tracker_id} 已停止")
    
    def assign_task(self, task: Task):
        """
        分配任务
        
        Args:
            task: 要执行的任务
        """
        task.assigned_tracker = self.tracker_id
        self.task_queue.put(task)
        print(f"TaskTracker {self.tracker_id} 接收任务: {task.task_id}")
    
    def _worker_thread(self, worker_id: int):
        """
        工作线程
        
        Args:
            worker_id: 工作线程ID
        """
        while self.is_running:
            try:
                # 获取任务(超时1秒)
                task = self.task_queue.get(timeout=1)
                
                print(f"Worker {worker_id} 开始执行任务: {task.task_id}")
                
                # 记录运行中的任务
                self.running_tasks[task.task_id] = task
                
                # 执行任务
                try:
                    if isinstance(task, MapTask):
                        task.execute()
                    elif isinstance(task, ReduceTask):
                        task.execute()
                    
                    # 任务完成
                    self.completed_tasks.append(task)
                    print(f"Worker {worker_id} 完成任务: {task.task_id}")
                    
                except Exception as e:
                    # 任务失败
                    task.fail(str(e))
                    self.failed_tasks.append(task)
                    print(f"Worker {worker_id} 任务失败: {task.task_id}, 错误: {e}")
                
                finally:
                    # 从运行中任务列表移除
                    if task.task_id in self.running_tasks:
                        del self.running_tasks[task.task_id]
                    
                    # 通知JobTracker任务状态
                    if self.job_tracker:
                        self.job_tracker.task_status_update(task)
                
            except queue.Empty:
                # 队列为空,继续循环
                continue
            except Exception as e:
                print(f"Worker {worker_id} 发生错误: {e}")
    
    def _heartbeat_thread(self):
        """
        心跳线程
        """
        while self.is_running:
            try:
                time.sleep(self.heartbeat_interval)
                
                # 发送心跳
                if self.job_tracker:
                    self.job_tracker.receive_heartbeat(self.tracker_id, self.get_status())
                
                self.last_heartbeat = time.time()
                
            except Exception as e:
                print(f"TaskTracker {self.tracker_id} 心跳错误: {e}")
    
    def get_status(self) -> Dict[str, Any]:
        """
        获取TaskTracker状态
        
        Returns:
            Dict: 状态信息
        """
        return {
            'tracker_id': self.tracker_id,
            'running_tasks': len(self.running_tasks),
            'completed_tasks': len(self.completed_tasks),
            'failed_tasks': len(self.failed_tasks),
            'queue_size': self.task_queue.qsize(),
            'max_tasks': self.max_tasks,
            'last_heartbeat': self.last_heartbeat
        }

class Job:
    """
    MapReduce作业
    """
    
    def __init__(self, job_id: str, input_data: List[Any], 
                 mapper_func: Callable, reducer_func: Callable):
        self.job_id = job_id
        self.input_data = input_data
        self.mapper_func = mapper_func
        self.reducer_func = reducer_func
        self.status = JobStatus.SUBMITTED
        self.map_tasks = []
        self.reduce_tasks = []
        self.intermediate_data = {}
        self.final_results = {}
        self.start_time = None
        self.end_time = None
        self.progress = 0.0
    
    def create_map_tasks(self) -> List[MapTask]:
        """
        创建Map任务
        
        Returns:
            List[MapTask]: Map任务列表
        """
        map_tasks = []
        
        for i, data_chunk in enumerate(self.input_data):
            task_id = f"{self.job_id}_map_{i}"
            map_task = MapTask(task_id, data_chunk, self.mapper_func)
            map_tasks.append(map_task)
        
        self.map_tasks = map_tasks
        return map_tasks
    
    def create_reduce_tasks(self, intermediate_data: Dict[Any, List[Any]]) -> List[ReduceTask]:
        """
        创建Reduce任务
        
        Args:
            intermediate_data: 中间数据
            
        Returns:
            List[ReduceTask]: Reduce任务列表
        """
        reduce_tasks = []
        
        for i, (key, values) in enumerate(intermediate_data.items()):
            task_id = f"{self.job_id}_reduce_{i}"
            reduce_task = ReduceTask(task_id, key, values, self.reducer_func)
            reduce_tasks.append(reduce_task)
        
        self.reduce_tasks = reduce_tasks
        return reduce_tasks
    
    def start(self):
        """开始作业"""
        self.status = JobStatus.RUNNING
        self.start_time = time.time()
    
    def complete(self, final_results: Dict[Any, Any]):
        """完成作业"""
        self.status = JobStatus.COMPLETED
        self.end_time = time.time()
        self.final_results = final_results
        self.progress = 100.0
    
    def fail(self, error_message: str):
        """作业失败"""
        self.status = JobStatus.FAILED
        self.end_time = time.time()
        print(f"作业 {self.job_id} 失败: {error_message}")
    
    def get_duration(self) -> float:
        """获取作业执行时间"""
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return 0.0
    
    def update_progress(self):
        """更新作业进度"""
        total_tasks = len(self.map_tasks) + len(self.reduce_tasks)
        if total_tasks == 0:
            self.progress = 0.0
            return
        
        completed_map = sum(1 for task in self.map_tasks if task.status == TaskStatus.COMPLETED)
        completed_reduce = sum(1 for task in self.reduce_tasks if task.status == TaskStatus.COMPLETED)
        
        completed_total = completed_map + completed_reduce
        self.progress = (completed_total / total_tasks) * 100.0

class JobTracker:
    """
    作业跟踪器
    """
    
    def __init__(self, tracker_id: str = "jobtracker_1"):
        self.tracker_id = tracker_id
        self.task_trackers = {}
        self.jobs = {}
        self.pending_tasks = queue.Queue()
        self.is_running = False
        self.scheduler_thread = None
        self.heartbeat_timeout = 30  # 心跳超时时间(秒)
    
    def start(self):
        """启动JobTracker"""
        self.is_running = True
        
        # 启动调度线程
        self.scheduler_thread = threading.Thread(target=self._scheduler_thread)
        self.scheduler_thread.daemon = True
        self.scheduler_thread.start()
        
        print(f"JobTracker {self.tracker_id} 已启动")
    
    def stop(self):
        """停止JobTracker"""
        self.is_running = False
        print(f"JobTracker {self.tracker_id} 已停止")
    
    def register_task_tracker(self, task_tracker: TaskTracker):
        """
        注册TaskTracker
        
        Args:
            task_tracker: TaskTracker实例
        """
        self.task_trackers[task_tracker.tracker_id] = {
            'instance': task_tracker,
            'last_heartbeat': time.time(),
            'status': 'active'
        }
        task_tracker.job_tracker = self
        print(f"TaskTracker {task_tracker.tracker_id} 已注册")
    
    def submit_job(self, job: Job):
        """
        提交作业
        
        Args:
            job: 要提交的作业
        """
        self.jobs[job.job_id] = job
        job.start()
        
        # 创建Map任务
        map_tasks = job.create_map_tasks()
        
        # 将Map任务加入待处理队列
        for task in map_tasks:
            self.pending_tasks.put(task)
        
        print(f"作业 {job.job_id} 已提交,包含 {len(map_tasks)} 个Map任务")
    
    def _scheduler_thread(self):
        """
        调度线程
        """
        while self.is_running:
            try:
                # 检查TaskTracker健康状态
                self._check_task_tracker_health()
                
                # 调度任务
                self._schedule_tasks()
                
                # 检查作业状态
                self._check_job_status()
                
                time.sleep(1)  # 调度间隔
                
            except Exception as e:
                print(f"调度器错误: {e}")
    
    def _check_task_tracker_health(self):
        """
        检查TaskTracker健康状态
        """
        current_time = time.time()
        
        for tracker_id, tracker_info in list(self.task_trackers.items()):
            last_heartbeat = tracker_info['last_heartbeat']
            
            if current_time - last_heartbeat > self.heartbeat_timeout:
                print(f"TaskTracker {tracker_id} 心跳超时,标记为失效")
                tracker_info['status'] = 'failed'
                
                # 重新调度失效节点上的任务
                self._reschedule_failed_tasks(tracker_id)
    
    def _schedule_tasks(self):
        """
        调度任务
        """
        if self.pending_tasks.empty():
            return
        
        # 找到可用的TaskTracker
        available_trackers = []
        for tracker_id, tracker_info in self.task_trackers.items():
            if tracker_info['status'] == 'active':
                tracker = tracker_info['instance']
                if len(tracker.running_tasks) < tracker.max_tasks:
                    available_trackers.append(tracker)
        
        # 分配任务
        while not self.pending_tasks.empty() and available_trackers:
            task = self.pending_tasks.get()
            
            # 选择负载最轻的TaskTracker
            best_tracker = min(available_trackers, 
                             key=lambda t: len(t.running_tasks))
            
            best_tracker.assign_task(task)
            
            # 如果TaskTracker已满,从可用列表移除
            if len(best_tracker.running_tasks) >= best_tracker.max_tasks:
                available_trackers.remove(best_tracker)
    
    def _check_job_status(self):
        """
        检查作业状态
        """
        for job_id, job in list(self.jobs.items()):
            if job.status != JobStatus.RUNNING:
                continue
            
            # 更新作业进度
            job.update_progress()
            
            # 检查Map阶段是否完成
            map_completed = all(task.status == TaskStatus.COMPLETED 
                              for task in job.map_tasks)
            
            if map_completed and not job.reduce_tasks:
                # Map阶段完成,开始Reduce阶段
                self._start_reduce_phase(job)
            
            # 检查Reduce阶段是否完成
            if job.reduce_tasks:
                reduce_completed = all(task.status == TaskStatus.COMPLETED 
                                     for task in job.reduce_tasks)
                
                if reduce_completed:
                    # 作业完成
                    self._complete_job(job)
    
    def _start_reduce_phase(self, job: Job):
        """
        开始Reduce阶段
        
        Args:
            job: 作业实例
        """
        print(f"作业 {job.job_id} 开始Reduce阶段")
        
        # 收集Map任务的输出
        intermediate_data = {}
        for map_task in job.map_tasks:
            if map_task.output_data:
                for key, value in map_task.output_data:
                    if key not in intermediate_data:
                        intermediate_data[key] = []
                    intermediate_data[key].append(value)
        
        # 创建Reduce任务
        reduce_tasks = job.create_reduce_tasks(intermediate_data)
        
        # 将Reduce任务加入待处理队列
        for task in reduce_tasks:
            self.pending_tasks.put(task)
        
        print(f"作业 {job.job_id} 创建了 {len(reduce_tasks)} 个Reduce任务")
    
    def _complete_job(self, job: Job):
        """
        完成作业
        
        Args:
            job: 作业实例
        """
        # 收集Reduce任务的输出
        final_results = {}
        for reduce_task in job.reduce_tasks:
            if reduce_task.output_data:
                key, value = reduce_task.output_data
                final_results[key] = value
        
        job.complete(final_results)
        
        print(f"作业 {job.job_id} 已完成,耗时 {job.get_duration():.2f} 秒")
    
    def _reschedule_failed_tasks(self, failed_tracker_id: str):
        """
        重新调度失效节点上的任务
        
        Args:
            failed_tracker_id: 失效的TaskTracker ID
        """
        # 找到失效节点上的任务并重新调度
        for job in self.jobs.values():
            for task in job.map_tasks + job.reduce_tasks:
                if (task.assigned_tracker == failed_tracker_id and 
                    task.status == TaskStatus.RUNNING):
                    
                    print(f"重新调度任务: {task.task_id}")
                    task.status = TaskStatus.PENDING
                    task.assigned_tracker = None
                    self.pending_tasks.put(task)
    
    def receive_heartbeat(self, tracker_id: str, status: Dict[str, Any]):
        """
        接收TaskTracker心跳
        
        Args:
            tracker_id: TaskTracker ID
            status: 状态信息
        """
        if tracker_id in self.task_trackers:
            self.task_trackers[tracker_id]['last_heartbeat'] = time.time()
            self.task_trackers[tracker_id]['status'] = 'active'
    
    def task_status_update(self, task: Task):
        """
        任务状态更新
        
        Args:
            task: 任务实例
        """
        # 这里可以添加任务状态更新的处理逻辑
        pass
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """
        获取集群状态
        
        Returns:
            Dict: 集群状态信息
        """
        active_trackers = sum(1 for info in self.task_trackers.values() 
                            if info['status'] == 'active')
        
        total_running_tasks = sum(len(info['instance'].running_tasks) 
                                for info in self.task_trackers.values() 
                                if info['status'] == 'active')
        
        return {
            'total_task_trackers': len(self.task_trackers),
            'active_task_trackers': active_trackers,
            'total_jobs': len(self.jobs),
            'running_jobs': sum(1 for job in self.jobs.values() 
                              if job.status == JobStatus.RUNNING),
            'total_running_tasks': total_running_tasks,
            'pending_tasks': self.pending_tasks.qsize()
        }

# 使用示例
if __name__ == "__main__":
    import time
    
    # 创建JobTracker
    job_tracker = JobTracker()
    job_tracker.start()
    
    # 创建TaskTracker
    task_trackers = []
    for i in range(3):
        tracker = TaskTracker(f"tasktracker_{i+1}", max_tasks=2)
        tracker.start()
        job_tracker.register_task_tracker(tracker)
        task_trackers.append(tracker)
    
    # 定义Map和Reduce函数
    def word_count_mapper(text):
        words = text.lower().split()
        return [(word, 1) for word in words if word.isalpha()]
    
    def word_count_reducer(key, values):
        return sum(values)
    
    # 创建作业
    input_data = [
        "Hello world hello hadoop",
        "MapReduce is powerful",
        "Hadoop MapReduce framework",
        "Big data processing"
    ]
    
    job = Job("wordcount_job_1", input_data, word_count_mapper, word_count_reducer)
    
    # 提交作业
    job_tracker.submit_job(job)
    
    # 等待作业完成
    while job.status == JobStatus.RUNNING:
        print(f"作业进度: {job.progress:.1f}%")
        
        # 显示集群状态
        cluster_status = job_tracker.get_cluster_status()
        print(f"集群状态: {cluster_status}")
        
        time.sleep(2)
    
    # 显示结果
    if job.status == JobStatus.COMPLETED:
        print("\n=== 词频统计结果 ===")
        for word, count in sorted(job.final_results.items(), 
                                 key=lambda x: x[1], reverse=True):
            print(f"{word}: {count}")
        
        print(f"\n作业执行时间: {job.get_duration():.2f} 秒")
    
    # 停止服务
    for tracker in task_trackers:
        tracker.stop()
    job_tracker.stop()

4.2.2 数据流程

MapReduce的数据流程包括以下几个阶段:

  1. 输入阶段(Input)
  2. Map阶段(Map)
  3. Shuffle阶段(Shuffle)
  4. Sort阶段(Sort)
  5. Reduce阶段(Reduce)
  6. 输出阶段(Output)
class MapReduceDataFlow:
    """
    MapReduce数据流程模拟器
    """
    
    def __init__(self):
        self.input_splits = []
        self.map_outputs = []
        self.shuffled_data = {}
        self.sorted_data = {}
        self.reduce_outputs = []
        self.final_output = []
        self.statistics = {
            'input_records': 0,
            'map_output_records': 0,
            'shuffle_bytes': 0,
            'reduce_input_records': 0,
            'output_records': 0
        }
    
    def input_phase(self, raw_data: List[str], split_size: int = 2) -> List[List[str]]:
        """
        输入阶段:将原始数据分割成输入分片
        
        Args:
            raw_data: 原始数据
            split_size: 分片大小
            
        Returns:
            List[List[str]]: 输入分片列表
        """
        print("=== 输入阶段 ===")
        
        # 将数据分割成分片
        self.input_splits = []
        for i in range(0, len(raw_data), split_size):
            split = raw_data[i:i + split_size]
            self.input_splits.append(split)
        
        self.statistics['input_records'] = len(raw_data)
        
        print(f"原始数据记录数: {len(raw_data)}")
        print(f"生成输入分片数: {len(self.input_splits)}")
        
        for i, split in enumerate(self.input_splits):
            print(f"  分片 {i+1}: {len(split)} 条记录")
        
        return self.input_splits
    
    def map_phase(self, mapper_func: Callable) -> List[List[Tuple[Any, Any]]]:
        """
        Map阶段:对每个输入分片执行Map函数
        
        Args:
            mapper_func: Map函数
            
        Returns:
            List[List[Tuple]]: Map输出列表
        """
        print("\n=== Map阶段 ===")
        
        self.map_outputs = []
        total_map_records = 0
        
        for i, input_split in enumerate(self.input_splits):
            print(f"\nMapper {i+1} 处理分片 {i+1}:")
            
            mapper_output = []
            for record in input_split:
                # 执行Map函数
                key_value_pairs = mapper_func(record)
                mapper_output.extend(key_value_pairs)
                
                print(f"  输入: {record[:30]}...")
                print(f"  输出: {len(key_value_pairs)} 个键值对")
            
            self.map_outputs.append(mapper_output)
            total_map_records += len(mapper_output)
            
            print(f"Mapper {i+1} 总输出: {len(mapper_output)} 个键值对")
        
        self.statistics['map_output_records'] = total_map_records
        print(f"\nMap阶段完成,总输出: {total_map_records} 个键值对")
        
        return self.map_outputs
    
    def shuffle_phase(self) -> Dict[Any, List[Any]]:
        """
        Shuffle阶段:按键重新分组Map输出
        
        Returns:
            Dict: 按键分组的数据
        """
        print("\n=== Shuffle阶段 ===")
        
        self.shuffled_data = {}
        total_shuffle_bytes = 0
        
        # 合并所有Map输出
        all_map_outputs = []
        for map_output in self.map_outputs:
            all_map_outputs.extend(map_output)
        
        # 按键分组
        for key, value in all_map_outputs:
            if key not in self.shuffled_data:
                self.shuffled_data[key] = []
            self.shuffled_data[key].append(value)
            
            # 估算传输字节数
            total_shuffle_bytes += len(str(key)) + len(str(value))
        
        self.statistics['shuffle_bytes'] = total_shuffle_bytes
        
        print(f"Shuffle完成,生成 {len(self.shuffled_data)} 个键组")
        print(f"估算网络传输: {total_shuffle_bytes} 字节")
        
        # 显示每个键的值数量
        for key, values in list(self.shuffled_data.items())[:5]:  # 只显示前5个
            print(f"  键 '{key}': {len(values)} 个值")
        
        if len(self.shuffled_data) > 5:
            print(f"  ... 还有 {len(self.shuffled_data) - 5} 个键")
        
        return self.shuffled_data
    
    def sort_phase(self) -> Dict[Any, List[Any]]:
        """
        Sort阶段:对键和值进行排序
        
        Returns:
            Dict: 排序后的数据
        """
        print("\n=== Sort阶段 ===")
        
        # 对键进行排序
        sorted_keys = sorted(self.shuffled_data.keys())
        
        self.sorted_data = {}
        for key in sorted_keys:
            # 对每个键的值也进行排序
            sorted_values = sorted(self.shuffled_data[key])
            self.sorted_data[key] = sorted_values
        
        print(f"Sort完成,{len(self.sorted_data)} 个键已排序")
        print(f"键排序示例: {list(sorted_keys)[:5]}...")
        
        return self.sorted_data
    
    def reduce_phase(self, reducer_func: Callable) -> List[Tuple[Any, Any]]:
        """
        Reduce阶段:对每个键执行Reduce函数
        
        Args:
            reducer_func: Reduce函数
            
        Returns:
            List[Tuple]: Reduce输出列表
        """
        print("\n=== Reduce阶段 ===")
        
        self.reduce_outputs = []
        total_reduce_input = 0
        
        for i, (key, values) in enumerate(self.sorted_data.items()):
            print(f"\nReducer {i+1} 处理键: {key}")
            print(f"  输入值数量: {len(values)}")
            
            # 执行Reduce函数
            result = reducer_func(key, values)
            self.reduce_outputs.append((key, result))
            
            total_reduce_input += len(values)
            
            print(f"  输出结果: {result}")
        
        self.statistics['reduce_input_records'] = total_reduce_input
        self.statistics['output_records'] = len(self.reduce_outputs)
        
        print(f"\nReduce阶段完成,生成 {len(self.reduce_outputs)} 个结果")
        
        return self.reduce_outputs
    
    def output_phase(self, output_path: str = None) -> List[Tuple[Any, Any]]:
        """
        输出阶段:将结果写入输出
        
        Args:
            output_path: 输出路径
            
        Returns:
            List[Tuple]: 最终输出
        """
        print("\n=== 输出阶段 ===")
        
        self.final_output = self.reduce_outputs.copy()
        
        if output_path:
            # 模拟写入文件
            print(f"将结果写入: {output_path}")
            with open(output_path, 'w', encoding='utf-8') as f:
                for key, value in self.final_output:
                    f.write(f"{key}\t{value}\n")
        
        print(f"输出完成,共 {len(self.final_output)} 条记录")
        
        return self.final_output
    
    def run_complete_flow(self, raw_data: List[str], mapper_func: Callable, 
                         reducer_func: Callable, output_path: str = None) -> List[Tuple[Any, Any]]:
        """
        运行完整的MapReduce数据流程
        
        Args:
            raw_data: 原始数据
            mapper_func: Map函数
            reducer_func: Reduce函数
            output_path: 输出路径
            
        Returns:
            List[Tuple]: 最终结果
        """
        print("开始MapReduce数据流程...\n")
        start_time = time.time()
        
        # 执行各个阶段
        self.input_phase(raw_data)
        self.map_phase(mapper_func)
        self.shuffle_phase()
        self.sort_phase()
        self.reduce_phase(reducer_func)
        final_result = self.output_phase(output_path)
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        # 显示统计信息
        print("\n=== 执行统计 ===")
        print(f"总执行时间: {execution_time:.3f} 秒")
        for metric, value in self.statistics.items():
            print(f"{metric}: {value}")
        
        # 计算数据压缩比
        if self.statistics['map_output_records'] > 0:
            compression_ratio = self.statistics['output_records'] / self.statistics['map_output_records']
            print(f"数据压缩比: {compression_ratio:.3f}")
        
        return final_result
    
    def visualize_data_flow(self):
        """
        可视化数据流程
        """
        print("\n=== MapReduce数据流程图 ===")
        print("""
        原始数据
            ↓
        ┌─────────────┐
        │  输入分片   │
        └─────────────┘
            ↓
        ┌─────────────┐
        │   Map阶段   │ ← 并行处理
        └─────────────┘
            ↓
        ┌─────────────┐
        │ Shuffle阶段 │ ← 网络传输
        └─────────────┘
            ↓
        ┌─────────────┐
        │  Sort阶段   │ ← 排序分组
        └─────────────┘
            ↓
        ┌─────────────┐
        │ Reduce阶段  │ ← 并行处理
        └─────────────┘
            ↓
        ┌─────────────┐
        │  最终输出   │
        └─────────────┘
        """)
        
        # 显示各阶段数据量变化
        print("\n=== 数据量变化 ===")
        stages = [
            ("输入记录", self.statistics['input_records']),
            ("Map输出", self.statistics['map_output_records']),
            ("Shuffle传输", f"{self.statistics['shuffle_bytes']} bytes"),
            ("Reduce输入", self.statistics['reduce_input_records']),
            ("最终输出", self.statistics['output_records'])
        ]
        
        for stage_name, stage_value in stages:
            print(f"{stage_name:12}: {stage_value}")

# 使用示例:完整的词频统计流程
def demo_word_count_flow():
    """
    演示完整的词频统计数据流程
    """
    # 创建数据流程模拟器
    data_flow = MapReduceDataFlow()
    
    # 准备测试数据
    raw_data = [
        "Hello world hello hadoop",
        "MapReduce is powerful and scalable",
        "Hadoop ecosystem includes HDFS MapReduce",
        "Big data processing with MapReduce framework",
        "Distributed computing made simple",
        "Scalable data processing solution"
    ]
    
    # 定义Map和Reduce函数
    def word_count_mapper(text):
        """词频统计Mapper"""
        words = text.lower().split()
        return [(word, 1) for word in words if word.isalpha()]
    
    def word_count_reducer(key, values):
        """词频统计Reducer"""
        return sum(values)
    
    # 运行完整流程
    results = data_flow.run_complete_flow(
        raw_data, 
        word_count_mapper, 
        word_count_reducer,
        "wordcount_output.txt"
    )
    
    # 显示结果
    print("\n=== 词频统计结果 ===")
    sorted_results = sorted(results, key=lambda x: x[1], reverse=True)
    for word, count in sorted_results[:10]:  # 显示前10个
        print(f"{word:15}: {count}")
    
    # 可视化数据流程
    data_flow.visualize_data_flow()
    
    return results

if __name__ == "__main__":
    # 运行演示
    demo_word_count_flow()

4.3 MapReduce编程实践

4.3.1 Java编程接口

Hadoop MapReduce提供了丰富的Java API,以下是主要的编程接口:

class MapReduceJavaAPI:
    """
    MapReduce Java API 模拟器
    展示主要的编程接口和概念
    """
    
    def __init__(self):
        self.api_components = {
            'mapper': self._mapper_api,
            'reducer': self._reducer_api,
            'driver': self._driver_api,
            'input_format': self._input_format_api,
            'output_format': self._output_format_api,
            'partitioner': self._partitioner_api,
            'combiner': self._combiner_api
        }
    
    def _mapper_api(self) -> dict:
        """
        Mapper API 说明
        
        Returns:
            dict: Mapper API 信息
        """
        return {
            'class_name': 'org.apache.hadoop.mapreduce.Mapper',
            'generic_types': '<KEYIN, VALUEIN, KEYOUT, VALUEOUT>',
            'main_method': 'map(KEYIN key, VALUEIN value, Context context)',
            'setup_method': 'setup(Context context)',
            'cleanup_method': 'cleanup(Context context)',
            'example_code': '''
// Java Mapper 示例
public class WordCountMapper 
    extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    protected void setup(Context context) 
        throws IOException, InterruptedException {
        // 初始化代码
    }
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        
        String line = value.toString().toLowerCase();
        StringTokenizer tokenizer = new StringTokenizer(line);
        
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
    
    @Override
    protected void cleanup(Context context) 
        throws IOException, InterruptedException {
        // 清理代码
    }
}
''',
            'key_concepts': [
                'Mapper类需要继承Mapper基类',
                '指定输入输出的键值类型',
                'map方法处理每个输入记录',
                'setup和cleanup方法用于初始化和清理',
                '使用Context对象输出结果'
            ]
        }
    
    def _reducer_api(self) -> dict:
        """
        Reducer API 说明
        
        Returns:
            dict: Reducer API 信息
        """
        return {
            'class_name': 'org.apache.hadoop.mapreduce.Reducer',
            'generic_types': '<KEYIN, VALUEIN, KEYOUT, VALUEOUT>',
            'main_method': 'reduce(KEYIN key, Iterable<VALUEIN> values, Context context)',
            'setup_method': 'setup(Context context)',
            'cleanup_method': 'cleanup(Context context)',
            'example_code': '''
// Java Reducer 示例
public class WordCountReducer 
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();
    
    @Override
    protected void setup(Context context) 
        throws IOException, InterruptedException {
        // 初始化代码
    }
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, 
                         Context context) 
        throws IOException, InterruptedException {
        
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        
        result.set(sum);
        context.write(key, result);
    }
    
    @Override
    protected void cleanup(Context context) 
        throws IOException, InterruptedException {
        // 清理代码
    }
}
''',
            'key_concepts': [
                'Reducer类需要继承Reducer基类',
                '指定输入输出的键值类型',
                'reduce方法处理每个键的所有值',
                '值以Iterable形式传入',
                '可以输出零个、一个或多个结果'
            ]
        }
    
    def _driver_api(self) -> dict:
        """
        Driver API 说明
        
        Returns:
            dict: Driver API 信息
        """
        return {
            'class_name': 'org.apache.hadoop.mapreduce.Job',
            'main_methods': [
                'Job.getInstance(Configuration conf)',
                'job.setJarByClass(Class<?>)',
                'job.setMapperClass(Class<?>)',
                'job.setReducerClass(Class<?>)',
                'job.setOutputKeyClass(Class<?>)',
                'job.setOutputValueClass(Class<?>)',
                'FileInputFormat.addInputPath(Job, Path)',
                'FileOutputFormat.setOutputPath(Job, Path)',
                'job.waitForCompletion(boolean)'
            ],
            'example_code': '''
// Java Driver 示例
public class WordCountDriver {
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        // 设置主类
        job.setJarByClass(WordCountDriver.class);
        
        // 设置Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 等待作业完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
''',
            'key_concepts': [
                'Driver负责配置和提交作业',
                '设置Mapper和Reducer类',
                '配置输入输出格式和路径',
                '设置输出键值类型',
                '可以设置Combiner优化性能'
            ]
        }
    
    def _input_format_api(self) -> dict:
        """
        InputFormat API 说明
        
        Returns:
            dict: InputFormat API 信息
        """
        return {
            'base_class': 'org.apache.hadoop.mapreduce.InputFormat',
            'common_implementations': [
                'TextInputFormat - 文本文件输入',
                'KeyValueTextInputFormat - 键值对文本输入',
                'SequenceFileInputFormat - 序列文件输入',
                'DBInputFormat - 数据库输入',
                'MultipleInputs - 多种输入格式'
            ],
            'main_methods': [
                'getSplits(JobContext context) - 获取输入分片',
                'createRecordReader(InputSplit split, TaskAttemptContext context) - 创建记录读取器'
            ],
            'example_usage': '''
// 设置输入格式
job.setInputFormatClass(TextInputFormat.class);

// 自定义输入格式
public class CustomInputFormat 
    extends FileInputFormat<LongWritable, Text> {
    
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) {
        return new CustomRecordReader();
    }
}
''',
            'key_concepts': [
                'InputFormat决定如何读取输入数据',
                '负责将输入数据分割成分片',
                '每个分片对应一个Map任务',
                '可以自定义输入格式处理特殊数据'
            ]
        }
    
    def _output_format_api(self) -> dict:
        """
        OutputFormat API 说明
        
        Returns:
            dict: OutputFormat API 信息
        """
        return {
            'base_class': 'org.apache.hadoop.mapreduce.OutputFormat',
            'common_implementations': [
                'TextOutputFormat - 文本文件输出',
                'SequenceFileOutputFormat - 序列文件输出',
                'DBOutputFormat - 数据库输出',
                'MultipleOutputs - 多文件输出',
                'NullOutputFormat - 无输出'
            ],
            'main_methods': [
                'getRecordWriter(TaskAttemptContext context) - 获取记录写入器',
                'checkOutputSpecs(JobContext context) - 检查输出规范',
                'getOutputCommitter(TaskAttemptContext context) - 获取输出提交器'
            ],
            'example_usage': '''
// 设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);

// 自定义输出格式
public class CustomOutputFormat 
    extends FileOutputFormat<Text, IntWritable> {
    
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException {
        return new CustomRecordWriter();
    }
}
''',
            'key_concepts': [
                'OutputFormat决定如何写入输出数据',
                '负责创建输出文件和目录',
                '处理输出数据的格式化',
                '支持多种输出目标和格式'
            ]
        }
    
    def _partitioner_api(self) -> dict:
        """
        Partitioner API 说明
        
        Returns:
            dict: Partitioner API 信息
        """
        return {
            'base_class': 'org.apache.hadoop.mapreduce.Partitioner',
            'default_implementation': 'HashPartitioner',
            'main_method': 'getPartition(KEY key, VALUE value, int numPartitions)',
            'example_code': '''
// 自定义分区器
public class CustomPartitioner 
    extends Partitioner<Text, IntWritable> {
    
    @Override
    public int getPartition(Text key, IntWritable value, 
                           int numPartitions) {
        
        // 根据键的首字母分区
        char firstChar = key.toString().charAt(0);
        
        if (firstChar >= 'a' && firstChar <= 'm') {
            return 0 % numPartitions;
        } else {
            return 1 % numPartitions;
        }
    }
}

// 在Driver中设置
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(2);
''',
            'key_concepts': [
                'Partitioner决定Map输出发送到哪个Reducer',
                '默认使用HashPartitioner基于键的哈希值分区',
                '自定义分区器可以实现特定的分区逻辑',
                '分区数量等于Reducer任务数量'
            ]
        }
    
    def _combiner_api(self) -> dict:
        """
        Combiner API 说明
        
        Returns:
            dict: Combiner API 信息
        """
        return {
            'description': 'Combiner是本地Reducer,在Map端预聚合数据',
            'base_class': '通常继承Reducer类',
            'execution_location': 'Map任务节点',
            'benefits': [
                '减少网络传输数据量',
                '提高整体性能',
                '降低Reducer负载'
            ],
            'example_code': '''
// 使用Reducer作为Combiner
job.setCombinerClass(WordCountReducer.class);

// 自定义Combiner
public class WordCountCombiner 
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, 
                         Context context) 
        throws IOException, InterruptedException {
        
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        
        result.set(sum);
        context.write(key, result);
    }
}
''',
            'usage_guidelines': [
                'Combiner函数必须是可交换和可结合的',
                '不是所有算法都适合使用Combiner',
                'Combiner的输出类型必须与Mapper输出类型一致',
                'Combiner可能被执行0次、1次或多次'
            ]
        }
    
    def generate_api_documentation(self) -> str:
        """
        生成API文档
        
        Returns:
            str: API文档
        """
        doc_lines = [
            "# MapReduce Java API 文档",
            f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
            "",
            "## 概述",
            "",
            "Hadoop MapReduce提供了丰富的Java API,用于开发分布式数据处理应用。",
            "主要组件包括Mapper、Reducer、Driver、InputFormat、OutputFormat等。",
            ""
        ]
        
        # 添加各个API组件的文档
        for component_name, component_func in self.api_components.items():
            component_info = component_func()
            
            doc_lines.extend([
                f"## {component_name.title()} API",
                ""
            ])
            
            # 添加基本信息
            if 'class_name' in component_info:
                doc_lines.append(f"**基类**: `{component_info['class_name']}`")
            
            if 'description' in component_info:
                doc_lines.append(f"**描述**: {component_info['description']}")
            
            if 'generic_types' in component_info:
                doc_lines.append(f"**泛型类型**: `{component_info['generic_types']}`")
            
            doc_lines.append("")
            
            # 添加主要方法
            if 'main_method' in component_info:
                doc_lines.extend([
                    "**主要方法**:",
                    f"```java
{component_info['main_method']}
```",
                    ""
                ])
            
            if 'main_methods' in component_info:
                doc_lines.append("**主要方法**:")
                for method in component_info['main_methods']:
                    doc_lines.append(f"- `{method}`")
                doc_lines.append("")
            
            # 添加示例代码
            if 'example_code' in component_info:
                doc_lines.extend([
                    "**示例代码**:",
                    f"```java{component_info['example_code']}```",
                    ""
                ])
            
            # 添加关键概念
            if 'key_concepts' in component_info:
                doc_lines.append("**关键概念**:")
                for concept in component_info['key_concepts']:
                    doc_lines.append(f"- {concept}")
                doc_lines.append("")
            
            doc_lines.append("---")
            doc_lines.append("")
        
        return "\n".join(doc_lines)
    
    def get_best_practices(self) -> List[str]:
        """
        获取MapReduce编程最佳实践
        
        Returns:
            List[str]: 最佳实践列表
        """
        return [
            "**Mapper设计**:",
            "- 保持Mapper逻辑简单,避免复杂计算",
            "- 尽量减少Mapper输出的数据量",
            "- 使用setup()方法进行一次性初始化",
            "- 避免在map()方法中创建大量对象",
            "",
            "**Reducer设计**:",
            "- Reducer应该是无状态的",
            "- 合理使用Combiner减少网络传输",
            "- 避免在reduce()方法中进行耗时操作",
            "- 考虑输出数据的排序需求",
            "",
            "**数据类型选择**:",
            "- 优先使用Hadoop的Writable类型",
            "- 自定义Writable类型时实现序列化优化",
            "- 避免使用Java原生序列化",
            "- 考虑数据压缩以减少I/O",
            "",
            "**性能优化**:",
            "- 合理设置Map和Reduce任务数量",
            "- 使用适当的分区器平衡负载",
            "- 启用推测执行处理慢任务",
            "- 调优JVM参数和内存设置",
            "",
            "**错误处理**:",
            "- 实现适当的异常处理机制",
            "- 使用计数器监控作业执行",
            "- 记录详细的日志信息",
            "- 设置合理的重试策略"
        ]

# 使用示例
if __name__ == "__main__":
    # 创建API文档生成器
    api_doc = MapReduceJavaAPI()
    
    # 生成完整API文档
    documentation = api_doc.generate_api_documentation()
    
    # 保存文档
    with open('mapreduce_java_api.md', 'w', encoding='utf-8') as f:
        f.write(documentation)
    
    print("MapReduce Java API文档已生成")
    
    # 显示最佳实践
     print("\n=== MapReduce编程最佳实践 ===")
     best_practices = api_doc.get_best_practices()
     for practice in best_practices:
         print(practice)

4.3.2 Python编程接口

虽然Hadoop原生支持Java,但也可以通过Streaming API使用Python编写MapReduce程序:

class MapReducePythonStreaming:
    """
    MapReduce Python Streaming 编程接口
    """
    
    def __init__(self):
        self.streaming_jar = "$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar"
        self.examples = {
            'word_count': self._word_count_example,
            'log_analysis': self._log_analysis_example,
            'data_aggregation': self._data_aggregation_example
        }
    
    def _word_count_example(self) -> dict:
        """
        词频统计示例
        
        Returns:
            dict: 示例代码和说明
        """
        return {
            'description': '使用Python Streaming实现词频统计',
            'mapper_code': '''
#!/usr/bin/env python3
# mapper.py
import sys

for line in sys.stdin:
    # 移除前后空白字符
    line = line.strip()
    # 分割单词
    words = line.split()
    
    # 输出每个单词和计数1
    for word in words:
        if word.isalpha():  # 只处理字母单词
            print(f"{word.lower()}\t1")
''',
            'reducer_code': '''
#!/usr/bin/env python3
# reducer.py
import sys
from collections import defaultdict

word_counts = defaultdict(int)

for line in sys.stdin:
    # 移除前后空白字符
    line = line.strip()
    
    # 解析输入
    try:
        word, count = line.split('\t')
        word_counts[word] += int(count)
    except ValueError:
        # 忽略格式错误的行
        continue

# 输出结果
for word, count in word_counts.items():
    print(f"{word}\t{count}")
''',
            'run_command': '''
# 运行命令
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -files mapper.py,reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input /input/data.txt \
    -output /output/wordcount
''',
            'key_points': [
                '使用标准输入输出进行数据交换',
                'Mapper和Reducer都是独立的Python脚本',
                '数据格式通常是制表符分隔的键值对',
                '需要处理输入数据的解析和验证'
            ]
        }
    
    def _log_analysis_example(self) -> dict:
        """
        日志分析示例
        
        Returns:
            dict: 示例代码和说明
        """
        return {
            'description': '分析Web服务器访问日志',
            'mapper_code': '''
#!/usr/bin/env python3
# log_mapper.py
import sys
import re
from datetime import datetime

# Apache访问日志格式正则表达式
log_pattern = re.compile(
    r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+)'
)

for line in sys.stdin:
    line = line.strip()
    match = log_pattern.match(line)
    
    if match:
        ip, timestamp, method, url, status, size = match.groups()
        
        # 解析时间戳
        try:
            dt = datetime.strptime(timestamp, '%d/%b/%Y:%H:%M:%S %z')
            hour = dt.strftime('%Y-%m-%d %H')
        except:
            hour = 'unknown'
        
        # 输出不同维度的统计
        print(f"ip:{ip}\t1")  # IP访问次数
        print(f"status:{status}\t1")  # 状态码统计
        print(f"hour:{hour}\t1")  # 小时访问量
        print(f"url:{url}\t1")  # URL访问次数
        
        # 流量统计
        try:
            bytes_sent = int(size) if size != '-' else 0
            print(f"traffic:total\t{bytes_sent}")
        except:
            pass
''',
            'reducer_code': '''
#!/usr/bin/env python3
# log_reducer.py
import sys
from collections import defaultdict

counters = defaultdict(int)
traffic = 0

for line in sys.stdin:
    line = line.strip()
    
    try:
        key, value = line.split('\t', 1)
        
        if key.startswith('traffic:'):
            traffic += int(value)
        else:
            counters[key] += int(value)
    except ValueError:
        continue

# 输出统计结果
for key, count in sorted(counters.items()):
    print(f"{key}\t{count}")

if traffic > 0:
    print(f"traffic:total\t{traffic}")
''',
            'run_command': '''
# 运行日志分析
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -files log_mapper.py,log_reducer.py \
    -mapper log_mapper.py \
    -reducer log_reducer.py \
    -input /logs/access.log \
    -output /analysis/log_stats
''',
            'key_points': [
                '使用正则表达式解析复杂日志格式',
                '可以同时输出多个维度的统计信息',
                '处理时间戳和数值转换',
                '适合大规模日志数据分析'
            ]
        }
    
    def _data_aggregation_example(self) -> dict:
        """
        数据聚合示例
        
        Returns:
            dict: 示例代码和说明
        """
        return {
            'description': '销售数据聚合分析',
            'mapper_code': '''
#!/usr/bin/env python3
# sales_mapper.py
import sys
import json
from datetime import datetime

for line in sys.stdin:
    line = line.strip()
    
    try:
        # 假设输入是JSON格式的销售记录
        record = json.loads(line)
        
        date = record.get('date', '')
        product = record.get('product', '')
        category = record.get('category', '')
        amount = float(record.get('amount', 0))
        quantity = int(record.get('quantity', 0))
        
        # 按不同维度聚合
        print(f"daily:{date}\t{amount}\t{quantity}")
        print(f"product:{product}\t{amount}\t{quantity}")
        print(f"category:{category}\t{amount}\t{quantity}")
        
        # 月度统计
        try:
            dt = datetime.strptime(date, '%Y-%m-%d')
            month = dt.strftime('%Y-%m')
            print(f"monthly:{month}\t{amount}\t{quantity}")
        except:
            pass
            
    except (json.JSONDecodeError, ValueError, KeyError):
        # 跳过无效记录
        continue
''',
            'reducer_code': '''
#!/usr/bin/env python3
# sales_reducer.py
import sys
from collections import defaultdict

# 存储聚合数据
aggregations = defaultdict(lambda: {'amount': 0, 'quantity': 0, 'count': 0})

for line in sys.stdin:
    line = line.strip()
    
    try:
        parts = line.split('\t')
        if len(parts) == 3:
            key, amount, quantity = parts
            
            aggregations[key]['amount'] += float(amount)
            aggregations[key]['quantity'] += int(quantity)
            aggregations[key]['count'] += 1
    except (ValueError, IndexError):
        continue

# 输出聚合结果
for key, stats in sorted(aggregations.items()):
    avg_amount = stats['amount'] / stats['count'] if stats['count'] > 0 else 0
    
    result = {
        'total_amount': stats['amount'],
        'total_quantity': stats['quantity'],
        'transaction_count': stats['count'],
        'average_amount': round(avg_amount, 2)
    }
    
    print(f"{key}\t{json.dumps(result)}")
''',
            'run_command': '''
# 运行销售数据分析
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -files sales_mapper.py,sales_reducer.py \
    -mapper sales_mapper.py \
    -reducer sales_reducer.py \
    -input /data/sales.json \
    -output /analysis/sales_stats
''',
            'key_points': [
                '处理JSON格式的结构化数据',
                '实现多维度数据聚合',
                '计算平均值等统计指标',
                '输出结构化的分析结果'
            ]
        }
    
    def generate_streaming_guide(self) -> str:
        """
        生成Python Streaming编程指南
        
        Returns:
            str: 编程指南
        """
        guide_lines = [
            "# MapReduce Python Streaming 编程指南",
            f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
            "",
            "## 概述",
            "",
            "Hadoop Streaming允许使用任何可执行程序作为Mapper或Reducer,",
            "包括Python脚本。这为不熟悉Java的开发者提供了便利。",
            "",
            "## 基本原理",
            "",
            "1. **输入输出**: 通过标准输入输出(stdin/stdout)进行数据交换",
            "2. **数据格式**: 通常使用制表符分隔的键值对",
            "3. **执行环境**: 脚本在Hadoop集群节点上执行",
            "4. **文件分发**: 使用-files参数分发脚本到各节点",
            "",
            "## 编程示例",
            ""
        ]
        
        # 添加各个示例
        for example_name, example_func in self.examples.items():
            example_info = example_func()
            
            guide_lines.extend([
                f"### {example_name.replace('_', ' ').title()}",
                "",
                example_info['description'],
                "",
                "**Mapper代码**:",
                f"```python{example_info['mapper_code']}```",
                "",
                "**Reducer代码**:",
                f"```python{example_info['reducer_code']}```",
                "",
                "**运行命令**:",
                f"```bash{example_info['run_command']}```",
                "",
                "**关键要点**:"
            ])
            
            for point in example_info['key_points']:
                guide_lines.append(f"- {point}")
            
            guide_lines.extend(["", "---", ""])
        
        # 添加最佳实践
        guide_lines.extend([
            "## Python Streaming 最佳实践",
            "",
            "### 1. 脚本设计",
            "- 使用shebang行指定Python解释器",
            "- 添加可执行权限: `chmod +x script.py`",
            "- 处理输入数据的异常情况",
            "- 使用适当的数据结构优化内存使用",
            "",
            "### 2. 数据处理",
            "- 验证输入数据格式",
            "- 处理编码问题(UTF-8)",
            "- 避免在内存中缓存大量数据",
            "- 使用生成器处理大数据集",
            "",
            "### 3. 错误处理",
            "- 捕获并记录异常",
            "- 跳过无效的输入记录",
            "- 使用stderr输出调试信息",
            "- 设置适当的退出码",
            "",
            "### 4. 性能优化",
            "- 减少不必要的字符串操作",
            "- 使用内置函数和库",
            "- 避免频繁的I/O操作",
            "- 考虑使用Combiner减少数据传输",
            "",
            "### 5. 调试技巧",
            "- 本地测试: `cat input.txt | python mapper.py | sort | python reducer.py`",
            "- 使用小数据集验证逻辑",
            "- 检查Hadoop日志文件",
            "- 使用计数器监控执行状态"
        ])
        
        return "\n".join(guide_lines)
    
    def create_streaming_template(self, job_name: str) -> dict:
        """
        创建Streaming作业模板
        
        Args:
            job_name: 作业名称
            
        Returns:
            dict: 模板文件内容
        """
        return {
            'mapper.py': f'''
#!/usr/bin/env python3
# {job_name} Mapper
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        
        # TODO: 实现Mapper逻辑
        # 示例: 输出键值对
        # print(f"{{key}}\t{{value}}")
        
        pass

if __name__ == "__main__":
    main()
''',
            'reducer.py': f'''
#!/usr/bin/env python3
# {job_name} Reducer
import sys
from collections import defaultdict

def main():
    # TODO: 实现Reducer逻辑
    # 示例: 聚合相同键的值
    
    for line in sys.stdin:
        line = line.strip()
        
        try:
            key, value = line.split('\t', 1)
            # 处理键值对
            pass
        except ValueError:
            continue
    
    # 输出结果
    # print(f"{{key}}\t{{result}}")

if __name__ == "__main__":
    main()
''',
            'run.sh': f'''
#!/bin/bash
# {job_name} 运行脚本

# 设置变量
INPUT_PATH="/input/data"
OUTPUT_PATH="/output/{job_name.lower()}"
STREAMING_JAR="$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar"

# 删除输出目录(如果存在)
hadoop fs -rm -r $OUTPUT_PATH 2>/dev/null

# 运行MapReduce作业
hadoop jar $STREAMING_JAR \
    -files mapper.py,reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input $INPUT_PATH \
    -output $OUTPUT_PATH

# 检查结果
if [ $? -eq 0 ]; then
    echo "作业完成成功"
    echo "查看结果: hadoop fs -cat $OUTPUT_PATH/part-*"
else
    echo "作业执行失败"
    exit 1
fi
''',
            'test.sh': f'''
#!/bin/bash
# {job_name} 本地测试脚本

# 创建测试数据
echo "创建测试数据..."
cat > test_input.txt << EOF
# TODO: 添加测试数据
EOF

# 本地测试
echo "运行本地测试..."
cat test_input.txt | python mapper.py | sort | python reducer.py > test_output.txt

echo "测试完成,结果保存在 test_output.txt"
cat test_output.txt
'''
        }

# 使用示例
if __name__ == "__main__":
    # 创建Streaming编程指南
    streaming = MapReducePythonStreaming()
    
    # 生成编程指南
    guide = streaming.generate_streaming_guide()
    
    # 保存指南
    with open('python_streaming_guide.md', 'w', encoding='utf-8') as f:
        f.write(guide)
    
    print("Python Streaming编程指南已生成")
    
    # 创建项目模板
    template = streaming.create_streaming_template("WordCount")
    
    # 保存模板文件
    for filename, content in template.items():
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
        
        # 为脚本文件添加执行权限
        if filename.endswith('.py') or filename.endswith('.sh'):
            import os
            os.chmod(filename, 0o755)
    
    print("项目模板文件已创建")

4.4 MapReduce性能优化

4.4.1 性能监控与分析

class MapReducePerformanceMonitor:
    """
    MapReduce性能监控器
    """
    
    def __init__(self):
        self.metrics = {
            'job_metrics': {},
            'task_metrics': {},
            'resource_metrics': {},
            'io_metrics': {}
        }
        self.performance_history = []
        self.bottlenecks = []
    
    def collect_job_metrics(self, job_id: str) -> dict:
        """
        收集作业级别的性能指标
        
        Args:
            job_id: 作业ID
            
        Returns:
            dict: 作业性能指标
        """
        # 模拟收集作业指标
        job_metrics = {
            'job_id': job_id,
            'start_time': time.time(),
            'end_time': None,
            'duration': 0,
            'map_tasks': {
                'total': 0,
                'completed': 0,
                'failed': 0,
                'avg_duration': 0,
                'max_duration': 0,
                'min_duration': 0
            },
            'reduce_tasks': {
                'total': 0,
                'completed': 0,
                'failed': 0,
                'avg_duration': 0,
                'max_duration': 0,
                'min_duration': 0
            },
            'data_metrics': {
                'input_bytes': 0,
                'output_bytes': 0,
                'shuffle_bytes': 0,
                'spilled_records': 0
            },
            'resource_usage': {
                'cpu_time': 0,
                'memory_usage': 0,
                'gc_time': 0
            }
        }
        
        self.metrics['job_metrics'][job_id] = job_metrics
        return job_metrics
    
    def collect_task_metrics(self, task_id: str, task_type: str) -> dict:
        """
        收集任务级别的性能指标
        
        Args:
            task_id: 任务ID
            task_type: 任务类型(map/reduce)
            
        Returns:
            dict: 任务性能指标
        """
        task_metrics = {
            'task_id': task_id,
            'task_type': task_type,
            'start_time': time.time(),
            'end_time': None,
            'duration': 0,
            'status': 'running',
            'progress': 0.0,
            'input_records': 0,
            'output_records': 0,
            'input_bytes': 0,
            'output_bytes': 0,
            'cpu_usage': 0.0,
            'memory_usage': 0,
            'disk_io': {
                'read_bytes': 0,
                'write_bytes': 0,
                'read_ops': 0,
                'write_ops': 0
            },
            'network_io': {
                'bytes_sent': 0,
                'bytes_received': 0
            },
            'counters': {
                'map_input_records': 0,
                'map_output_records': 0,
                'reduce_input_records': 0,
                'reduce_output_records': 0,
                'spilled_records': 0,
                'gc_time_millis': 0
            }
        }
        
        self.metrics['task_metrics'][task_id] = task_metrics
        return task_metrics
    
    def analyze_performance(self, job_id: str) -> dict:
        """
        分析作业性能
        
        Args:
            job_id: 作业ID
            
        Returns:
            dict: 性能分析结果
        """
        if job_id not in self.metrics['job_metrics']:
            return {'error': f'Job {job_id} not found'}
        
        job_metrics = self.metrics['job_metrics'][job_id]
        analysis = {
            'job_id': job_id,
            'overall_performance': {},
            'bottlenecks': [],
            'recommendations': [],
            'efficiency_score': 0.0
        }
        
        # 分析整体性能
        total_duration = job_metrics.get('duration', 0)
        map_duration = job_metrics['map_tasks'].get('avg_duration', 0)
        reduce_duration = job_metrics['reduce_tasks'].get('avg_duration', 0)
        
        analysis['overall_performance'] = {
            'total_duration': total_duration,
            'map_phase_ratio': map_duration / total_duration if total_duration > 0 else 0,
            'reduce_phase_ratio': reduce_duration / total_duration if total_duration > 0 else 0,
            'data_locality': self._calculate_data_locality(job_id),
            'resource_utilization': self._calculate_resource_utilization(job_id)
        }
        
        # 识别瓶颈
        bottlenecks = self._identify_bottlenecks(job_id)
        analysis['bottlenecks'] = bottlenecks
        
        # 生成优化建议
        recommendations = self._generate_recommendations(job_id, bottlenecks)
        analysis['recommendations'] = recommendations
        
        # 计算效率分数
        efficiency_score = self._calculate_efficiency_score(job_id)
        analysis['efficiency_score'] = efficiency_score
        
        return analysis
    
    def _calculate_data_locality(self, job_id: str) -> float:
        """
        计算数据本地性
        
        Args:
            job_id: 作业ID
            
        Returns:
            float: 数据本地性比例
        """
        # 模拟数据本地性计算
        import random
        return random.uniform(0.6, 0.95)
    
    def _calculate_resource_utilization(self, job_id: str) -> dict:
        """
        计算资源利用率
        
        Args:
            job_id: 作业ID
            
        Returns:
            dict: 资源利用率指标
        """
        # 模拟资源利用率计算
        import random
        return {
            'cpu_utilization': random.uniform(0.4, 0.9),
            'memory_utilization': random.uniform(0.3, 0.8),
            'disk_utilization': random.uniform(0.2, 0.7),
            'network_utilization': random.uniform(0.1, 0.6)
        }
    
    def _identify_bottlenecks(self, job_id: str) -> List[dict]:
        """
        识别性能瓶颈
        
        Args:
            job_id: 作业ID
            
        Returns:
            List[dict]: 瓶颈列表
        """
        bottlenecks = []
        job_metrics = self.metrics['job_metrics'][job_id]
        
        # 检查Map任务瓶颈
        map_tasks = job_metrics['map_tasks']
        if map_tasks.get('max_duration', 0) > map_tasks.get('avg_duration', 0) * 2:
            bottlenecks.append({
                'type': 'slow_map_tasks',
                'severity': 'high',
                'description': 'Map任务执行时间差异过大,存在慢任务',
                'impact': 'Map阶段整体执行时间延长'
            })
        
        # 检查Reduce任务瓶颈
        reduce_tasks = job_metrics['reduce_tasks']
        if reduce_tasks.get('max_duration', 0) > reduce_tasks.get('avg_duration', 0) * 2:
            bottlenecks.append({
                'type': 'slow_reduce_tasks',
                'severity': 'high',
                'description': 'Reduce任务执行时间差异过大,存在慢任务',
                'impact': 'Reduce阶段整体执行时间延长'
            })
        
        # 检查数据倾斜
        shuffle_bytes = job_metrics['data_metrics'].get('shuffle_bytes', 0)
        input_bytes = job_metrics['data_metrics'].get('input_bytes', 1)
        if shuffle_bytes > input_bytes * 3:
            bottlenecks.append({
                'type': 'data_skew',
                'severity': 'medium',
                'description': 'Shuffle数据量过大,可能存在数据倾斜',
                'impact': '网络传输压力大,Reduce任务负载不均'
            })
        
        # 检查内存使用
        memory_usage = job_metrics['resource_usage'].get('memory_usage', 0)
        if memory_usage > 0.9:  # 90%以上内存使用率
            bottlenecks.append({
                'type': 'memory_pressure',
                'severity': 'medium',
                'description': '内存使用率过高',
                'impact': '可能导致频繁GC和性能下降'
            })
        
        return bottlenecks
    
    def _generate_recommendations(self, job_id: str, bottlenecks: List[dict]) -> List[dict]:
        """
        生成优化建议
        
        Args:
            job_id: 作业ID
            bottlenecks: 瓶颈列表
            
        Returns:
            List[dict]: 优化建议列表
        """
        recommendations = []
        
        for bottleneck in bottlenecks:
            if bottleneck['type'] == 'slow_map_tasks':
                recommendations.append({
                    'category': 'task_optimization',
                    'priority': 'high',
                    'title': '优化Map任务执行',
                    'description': '启用推测执行,调整Map任务数量',
                    'actions': [
                        '设置 mapreduce.map.speculative=true',
                        '调整 mapreduce.input.fileinputformat.split.maxsize',
                        '检查数据分布是否均匀'
                    ]
                })
            
            elif bottleneck['type'] == 'slow_reduce_tasks':
                recommendations.append({
                    'category': 'task_optimization',
                    'priority': 'high',
                    'title': '优化Reduce任务执行',
                    'description': '启用推测执行,调整Reduce任务数量',
                    'actions': [
                        '设置 mapreduce.reduce.speculative=true',
                        '调整 mapreduce.job.reduces 参数',
                        '使用自定义Partitioner平衡负载'
                    ]
                })
            
            elif bottleneck['type'] == 'data_skew':
                recommendations.append({
                    'category': 'data_optimization',
                    'priority': 'medium',
                    'title': '解决数据倾斜问题',
                    'description': '优化数据分布和分区策略',
                    'actions': [
                        '使用Combiner减少Shuffle数据量',
                        '实现自定义Partitioner',
                        '考虑数据预处理和采样'
                    ]
                })
            
            elif bottleneck['type'] == 'memory_pressure':
                recommendations.append({
                    'category': 'resource_optimization',
                    'priority': 'medium',
                    'title': '优化内存使用',
                    'description': '调整内存配置和GC参数',
                    'actions': [
                        '增加 mapreduce.map.memory.mb',
                        '增加 mapreduce.reduce.memory.mb',
                        '调优JVM堆内存和GC参数'
                    ]
                })
        
        # 添加通用优化建议
        recommendations.extend([
            {
                'category': 'general_optimization',
                'priority': 'low',
                'title': '启用数据压缩',
                'description': '减少I/O和网络传输开销',
                'actions': [
                    '设置 mapreduce.map.output.compress=true',
                    '设置 mapreduce.output.fileoutputformat.compress=true',
                    '选择合适的压缩算法(LZ4, Snappy)'
                ]
            },
            {
                'category': 'general_optimization',
                'priority': 'low',
                'title': '优化I/O操作',
                'description': '提高磁盘和网络I/O效率',
                'actions': [
                    '调整 io.file.buffer.size',
                    '设置合适的 dfs.blocksize',
                    '使用SSD存储临时数据'
                ]
            }
        ])
        
        return recommendations
    
    def _calculate_efficiency_score(self, job_id: str) -> float:
        """
        计算效率分数
        
        Args:
            job_id: 作业ID
            
        Returns:
            float: 效率分数(0-100)
        """
        job_metrics = self.metrics['job_metrics'][job_id]
        
        # 基于多个指标计算综合分数
        factors = {
            'data_locality': self._calculate_data_locality(job_id) * 30,
            'resource_utilization': sum(self._calculate_resource_utilization(job_id).values()) / 4 * 25,
            'task_balance': self._calculate_task_balance(job_id) * 20,
            'failure_rate': (1 - self._calculate_failure_rate(job_id)) * 15,
            'io_efficiency': self._calculate_io_efficiency(job_id) * 10
        }
        
        total_score = sum(factors.values())
        return min(100, max(0, total_score))
    
    def _calculate_task_balance(self, job_id: str) -> float:
        """
        计算任务负载均衡度
        
        Args:
            job_id: 作业ID
            
        Returns:
            float: 负载均衡度(0-1)
        """
        # 模拟计算任务负载均衡度
        import random
        return random.uniform(0.6, 0.95)
    
    def _calculate_failure_rate(self, job_id: str) -> float:
        """
        计算任务失败率
        
        Args:
            job_id: 作业ID
            
        Returns:
            float: 失败率(0-1)
        """
        job_metrics = self.metrics['job_metrics'][job_id]
        
        total_tasks = (job_metrics['map_tasks'].get('total', 0) + 
                      job_metrics['reduce_tasks'].get('total', 0))
        failed_tasks = (job_metrics['map_tasks'].get('failed', 0) + 
                       job_metrics['reduce_tasks'].get('failed', 0))
        
        return failed_tasks / total_tasks if total_tasks > 0 else 0
    
    def _calculate_io_efficiency(self, job_id: str) -> float:
        """
        计算I/O效率
        
        Args:
            job_id: 作业ID
            
        Returns:
            float: I/O效率(0-1)
        """
        # 模拟计算I/O效率
        import random
        return random.uniform(0.5, 0.9)
    
    def generate_performance_report(self, job_id: str) -> str:
        """
        生成性能报告
        
        Args:
            job_id: 作业ID
            
        Returns:
            str: 性能报告
        """
        analysis = self.analyze_performance(job_id)
        
        if 'error' in analysis:
            return f"错误: {analysis['error']}"
        
        report_lines = [
            f"# MapReduce作业性能报告",
            f"作业ID: {job_id}",
            f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
            f"效率分数: {analysis['efficiency_score']:.1f}/100",
            "",
            "## 整体性能概览",
            ""
        ]
        
        # 添加整体性能信息
        overall = analysis['overall_performance']
        report_lines.extend([
            f"- 总执行时间: {overall['total_duration']:.2f} 秒",
            f"- Map阶段占比: {overall['map_phase_ratio']:.1%}",
            f"- Reduce阶段占比: {overall['reduce_phase_ratio']:.1%}",
            f"- 数据本地性: {overall['data_locality']:.1%}",
            "",
            "### 资源利用率",
            ""
        ])
        
        resource_util = overall['resource_utilization']
        for resource, utilization in resource_util.items():
            report_lines.append(f"- {resource}: {utilization:.1%}")
        
        # 添加瓶颈分析
        if analysis['bottlenecks']:
            report_lines.extend([
                "",
                "## 性能瓶颈",
                ""
            ])
            
            for bottleneck in analysis['bottlenecks']:
                report_lines.extend([
                    f"### {bottleneck['type']} (严重程度: {bottleneck['severity']})",
                    f"**描述**: {bottleneck['description']}",
                    f"**影响**: {bottleneck['impact']}",
                    ""
                ])
        
        # 添加优化建议
        if analysis['recommendations']:
            report_lines.extend([
                "## 优化建议",
                ""
            ])
            
            for rec in analysis['recommendations']:
                report_lines.extend([
                    f"### {rec['title']} (优先级: {rec['priority']})",
                    f"**描述**: {rec['description']}",
                    "**操作步骤**:"
                ])
                
                for action in rec['actions']:
                    report_lines.append(f"- {action}")
                
                report_lines.append("")
        
        return "\n".join(report_lines)

# 使用示例
if __name__ == "__main__":
    # 创建性能监控器
    monitor = MapReducePerformanceMonitor()
    
    # 模拟收集作业指标
    job_id = "job_202401_001"
    monitor.collect_job_metrics(job_id)
    
    # 模拟更新作业指标
    job_metrics = monitor.metrics['job_metrics'][job_id]
    job_metrics.update({
        'duration': 1800,  # 30分钟
        'map_tasks': {
            'total': 100,
            'completed': 98,
            'failed': 2,
            'avg_duration': 120,
            'max_duration': 300,
            'min_duration': 80
        },
        'reduce_tasks': {
            'total': 10,
            'completed': 10,
            'failed': 0,
            'avg_duration': 200,
            'max_duration': 250,
            'min_duration': 180
        },
        'data_metrics': {
            'input_bytes': 10 * 1024**3,  # 10GB
            'output_bytes': 2 * 1024**3,   # 2GB
            'shuffle_bytes': 5 * 1024**3,  # 5GB
            'spilled_records': 1000
        }
    })
    
    # 生成性能报告
    report = monitor.generate_performance_report(job_id)
    
    # 保存报告
    with open(f'performance_report_{job_id}.md', 'w', encoding='utf-8') as f:
        f.write(report)
    
    print(f"性能报告已生成: performance_report_{job_id}.md")
    print("\n=== 性能分析摘要 ===")
    analysis = monitor.analyze_performance(job_id)
    print(f"效率分数: {analysis['efficiency_score']:.1f}/100")
     print(f"发现瓶颈: {len(analysis['bottlenecks'])} 个")
     print(f"优化建议: {len(analysis['recommendations'])} 条")

4.4.2 性能优化策略

class MapReduceOptimizer:
    """
    MapReduce性能优化器
    """
    
    def __init__(self):
        self.optimization_strategies = {
            'input_optimization': self._input_optimization_strategies,
            'map_optimization': self._map_optimization_strategies,
            'shuffle_optimization': self._shuffle_optimization_strategies,
            'reduce_optimization': self._reduce_optimization_strategies,
            'output_optimization': self._output_optimization_strategies,
            'resource_optimization': self._resource_optimization_strategies
        }
        self.configuration_templates = {}
    
    def _input_optimization_strategies(self) -> List[dict]:
        """
        输入优化策略
        
        Returns:
            List[dict]: 输入优化策略列表
        """
        return [
            {
                'name': '合理设置输入分片大小',
                'description': '根据数据特点和集群配置优化输入分片',
                'parameters': {
                    'mapreduce.input.fileinputformat.split.minsize': '1MB',
                    'mapreduce.input.fileinputformat.split.maxsize': '256MB',
                    'dfs.blocksize': '128MB'
                },
                'benefits': [
                    '减少Map任务数量,降低调度开销',
                    '提高数据本地性',
                    '平衡任务负载'
                ],
                'considerations': [
                    '分片过大可能导致任务不均衡',
                    '分片过小增加调度开销',
                    '需要考虑集群节点数量'
                ]
            },
            {
                'name': '启用输入数据压缩',
                'description': '使用压缩格式减少I/O开销',
                'parameters': {
                    'mapreduce.input.fileinputformat.compressed': 'true',
                    'io.compression.codecs': 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.SnappyCodec'
                },
                'benefits': [
                    '减少磁盘I/O',
                    '降低网络传输开销',
                    '节省存储空间'
                ],
                'considerations': [
                    '增加CPU解压缩开销',
                    '某些压缩格式不支持分片',
                    '需要权衡压缩比和性能'
                ]
            },
            {
                'name': '优化输入格式',
                'description': '选择合适的InputFormat实现',
                'parameters': {
                    'mapreduce.job.inputformat.class': 'org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat'
                },
                'benefits': [
                    '处理小文件问题',
                    '减少Map任务数量',
                    '提高集群利用率'
                ],
                'considerations': [
                    '需要根据数据特点选择',
                    '可能需要自定义InputFormat',
                    '影响数据本地性'
                ]
            }
        ]
    
    def _map_optimization_strategies(self) -> List[dict]:
        """
        Map阶段优化策略
        
        Returns:
            List[dict]: Map优化策略列表
        """
        return [
            {
                'name': '启用Map端输出压缩',
                'description': '压缩Map输出减少Shuffle数据量',
                'parameters': {
                    'mapreduce.map.output.compress': 'true',
                    'mapreduce.map.output.compress.codec': 'org.apache.hadoop.io.compress.SnappyCodec'
                },
                'benefits': [
                    '减少Shuffle阶段网络传输',
                    '降低磁盘I/O',
                    '提高整体性能'
                ],
                'considerations': [
                    '增加CPU压缩开销',
                    '需要选择合适的压缩算法',
                    'Snappy平衡了压缩比和速度'
                ]
            },
            {
                'name': '使用Combiner',
                'description': '在Map端进行预聚合',
                'parameters': {
                    'mapreduce.job.combiner.class': 'com.example.MyCombiner'
                },
                'benefits': [
                    '大幅减少Shuffle数据量',
                    '降低网络传输压力',
                    '提高Reduce效率'
                ],
                'considerations': [
                    '只适用于满足结合律和交换律的操作',
                    '需要额外开发Combiner类',
                    '可能增加Map阶段执行时间'
                ]
            },
            {
                'name': '优化Map任务内存',
                'description': '合理配置Map任务内存使用',
                'parameters': {
                    'mapreduce.map.memory.mb': '2048',
                    'mapreduce.map.java.opts': '-Xmx1638m',
                    'mapreduce.map.sort.spill.percent': '0.8'
                },
                'benefits': [
                    '减少磁盘溢写次数',
                    '提高Map任务执行效率',
                    '降低GC压力'
                ],
                'considerations': [
                    '需要根据集群资源配置',
                    '内存过大可能导致资源浪费',
                    '需要监控内存使用情况'
                ]
            }
        ]
    
    def _shuffle_optimization_strategies(self) -> List[dict]:
        """
        Shuffle阶段优化策略
        
        Returns:
            List[dict]: Shuffle优化策略列表
        """
        return [
            {
                'name': '优化Shuffle参数',
                'description': '调整Shuffle阶段的关键参数',
                'parameters': {
                    'mapreduce.task.io.sort.mb': '512',
                    'mapreduce.task.io.sort.factor': '100',
                    'mapreduce.reduce.shuffle.parallelcopies': '20'
                },
                'benefits': [
                    '提高排序效率',
                    '增加并行度',
                    '减少Shuffle时间'
                ],
                'considerations': [
                    '需要根据数据量调整',
                    '参数过大可能导致内存不足',
                    '需要平衡内存和性能'
                ]
            },
            {
                'name': '启用Shuffle压缩',
                'description': '压缩Shuffle阶段的数据传输',
                'parameters': {
                    'mapreduce.reduce.shuffle.input.buffer.percent': '0.7',
                    'mapreduce.reduce.shuffle.merge.percent': '0.66'
                },
                'benefits': [
                    '减少网络传输时间',
                    '降低磁盘I/O',
                    '提高Shuffle效率'
                ],
                'considerations': [
                    '增加CPU开销',
                    '需要权衡压缩比和性能',
                    '影响内存使用'
                ]
            },
            {
                'name': '优化数据分区',
                'description': '使用自定义Partitioner平衡负载',
                'parameters': {
                    'mapreduce.job.partitioner.class': 'com.example.CustomPartitioner'
                },
                'benefits': [
                    '避免数据倾斜',
                    '平衡Reduce任务负载',
                    '提高并行度'
                ],
                'considerations': [
                    '需要了解数据分布特点',
                    '可能需要采样分析',
                    '实现复杂度较高'
                ]
            }
        ]
    
    def _reduce_optimization_strategies(self) -> List[dict]:
        """
        Reduce阶段优化策略
        
        Returns:
            List[dict]: Reduce优化策略列表
        """
        return [
            {
                'name': '优化Reduce任务数量',
                'description': '设置合适的Reduce任务数量',
                'parameters': {
                    'mapreduce.job.reduces': 'auto-calculated'
                },
                'calculation_formula': 'reduce_tasks = min(input_size_gb * 1.75 / reduce_slowstart_completedmaps, cluster_reduce_slots * 1.75)',
                'benefits': [
                    '充分利用集群资源',
                    '避免任务过多或过少',
                    '平衡执行时间和开销'
                ],
                'considerations': [
                    '需要考虑数据量和集群规模',
                    '过多任务增加调度开销',
                    '过少任务降低并行度'
                ]
            },
            {
                'name': '优化Reduce内存配置',
                'description': '合理配置Reduce任务内存',
                'parameters': {
                    'mapreduce.reduce.memory.mb': '4096',
                    'mapreduce.reduce.java.opts': '-Xmx3276m',
                    'mapreduce.reduce.input.buffer.percent': '0.0'
                },
                'benefits': [
                    '提高Reduce处理能力',
                    '减少磁盘溢写',
                    '降低GC频率'
                ],
                'considerations': [
                    '需要根据数据量配置',
                    '内存过大可能浪费资源',
                    '需要监控内存使用'
                ]
            },
            {
                'name': '启用推测执行',
                'description': '处理慢任务问题',
                'parameters': {
                    'mapreduce.reduce.speculative': 'true',
                    'mapreduce.reduce.speculative.slowtaskthreshold': '1.0'
                },
                'benefits': [
                    '减少慢任务影响',
                    '提高作业完成时间',
                    '增强容错能力'
                ],
                'considerations': [
                    '增加资源消耗',
                    '可能产生重复计算',
                    '需要谨慎配置阈值'
                ]
            }
        ]
    
    def _output_optimization_strategies(self) -> List[dict]:
        """
        输出优化策略
        
        Returns:
            List[dict]: 输出优化策略列表
        """
        return [
            {
                'name': '启用输出压缩',
                'description': '压缩最终输出结果',
                'parameters': {
                    'mapreduce.output.fileoutputformat.compress': 'true',
                    'mapreduce.output.fileoutputformat.compress.codec': 'org.apache.hadoop.io.compress.GzipCodec',
                    'mapreduce.output.fileoutputformat.compress.type': 'BLOCK'
                },
                'benefits': [
                    '节省存储空间',
                    '减少后续读取时间',
                    '降低存储成本'
                ],
                'considerations': [
                    '增加写入时间',
                    '影响后续处理性能',
                    '需要选择合适的压缩格式'
                ]
            },
            {
                'name': '优化输出格式',
                'description': '选择高效的输出格式',
                'parameters': {
                    'mapreduce.job.outputformat.class': 'org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat'
                },
                'benefits': [
                    '提高写入效率',
                    '支持压缩和分片',
                    '便于后续处理'
                ],
                'considerations': [
                    '需要根据使用场景选择',
                    '可能需要自定义OutputFormat',
                    '影响数据可读性'
                ]
            }
        ]
    
    def _resource_optimization_strategies(self) -> List[dict]:
        """
        资源优化策略
        
        Returns:
            List[dict]: 资源优化策略列表
        """
        return [
            {
                'name': 'JVM参数优化',
                'description': '优化JVM垃圾回收和内存管理',
                'parameters': {
                    'mapreduce.map.java.opts': '-Xmx1638m -XX:+UseG1GC -XX:MaxGCPauseMillis=200',
                    'mapreduce.reduce.java.opts': '-Xmx3276m -XX:+UseG1GC -XX:MaxGCPauseMillis=200'
                },
                'benefits': [
                    '减少GC停顿时间',
                    '提高内存利用率',
                    '改善任务执行稳定性'
                ],
                'considerations': [
                    '需要根据JVM版本选择',
                    'G1GC适合大内存场景',
                    '需要监控GC性能'
                ]
            },
            {
                'name': '磁盘I/O优化',
                'description': '优化磁盘读写性能',
                'parameters': {
                    'io.file.buffer.size': '131072',
                    'mapreduce.task.tmp.dir': '/tmp/hadoop-${user.name}/mapred/local'
                },
                'benefits': [
                    '提高磁盘I/O效率',
                    '减少系统调用次数',
                    '改善整体性能'
                ],
                'considerations': [
                    '需要足够的磁盘空间',
                    '可能需要SSD存储',
                    '注意临时目录权限'
                ]
            }
        ]
    
    def generate_optimization_plan(self, job_profile: dict) -> dict:
        """
        根据作业特征生成优化方案
        
        Args:
            job_profile: 作业特征描述
            
        Returns:
            dict: 优化方案
        """
        plan = {
            'job_profile': job_profile,
            'recommended_strategies': [],
            'configuration': {},
            'implementation_steps': [],
            'expected_improvements': {}
        }
        
        # 分析作业特征
        data_size = job_profile.get('data_size_gb', 0)
        cluster_size = job_profile.get('cluster_nodes', 1)
        job_type = job_profile.get('job_type', 'general')
        current_performance = job_profile.get('current_performance', {})
        
        # 根据数据量推荐策略
        if data_size > 100:  # 大数据量
            plan['recommended_strategies'].extend([
                'input_optimization',
                'map_optimization',
                'shuffle_optimization'
            ])
        
        if data_size < 10:  # 小数据量
            plan['recommended_strategies'].extend([
                'input_optimization',  # 主要解决小文件问题
                'resource_optimization'
            ])
        
        # 根据作业类型推荐策略
        if job_type == 'aggregation':
            plan['recommended_strategies'].extend([
                'map_optimization',  # 使用Combiner
                'reduce_optimization'
            ])
        elif job_type == 'join':
            plan['recommended_strategies'].extend([
                'shuffle_optimization',
                'reduce_optimization'
            ])
        
        # 生成具体配置
        for strategy_type in plan['recommended_strategies']:
            if strategy_type in self.optimization_strategies:
                strategies = self.optimization_strategies[strategy_type]()
                for strategy in strategies:
                    plan['configuration'].update(strategy['parameters'])
        
        # 生成实施步骤
        plan['implementation_steps'] = self._generate_implementation_steps(
            plan['recommended_strategies']
        )
        
        # 预估改进效果
        plan['expected_improvements'] = self._estimate_improvements(
            job_profile, plan['recommended_strategies']
        )
        
        return plan
    
    def _generate_implementation_steps(self, strategies: List[str]) -> List[dict]:
        """
        生成实施步骤
        
        Args:
            strategies: 策略列表
            
        Returns:
            List[dict]: 实施步骤
        """
        steps = [
            {
                'step': 1,
                'title': '备份当前配置',
                'description': '备份现有的Hadoop配置文件',
                'commands': [
                    'cp $HADOOP_CONF_DIR/mapred-site.xml $HADOOP_CONF_DIR/mapred-site.xml.backup',
                    'cp $HADOOP_CONF_DIR/yarn-site.xml $HADOOP_CONF_DIR/yarn-site.xml.backup'
                ],
                'estimated_time': '5分钟'
            },
            {
                'step': 2,
                'title': '更新配置文件',
                'description': '应用优化配置参数',
                'commands': [
                    '# 更新mapred-site.xml中的相关参数',
                    '# 更新yarn-site.xml中的资源配置'
                ],
                'estimated_time': '10分钟'
            },
            {
                'step': 3,
                'title': '重启相关服务',
                'description': '重启NodeManager服务使配置生效',
                'commands': [
                    '$HADOOP_HOME/sbin/stop-yarn.sh',
                    '$HADOOP_HOME/sbin/start-yarn.sh'
                ],
                'estimated_time': '5分钟'
            },
            {
                'step': 4,
                'title': '测试验证',
                'description': '运行测试作业验证优化效果',
                'commands': [
                    'hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount input output'
                ],
                'estimated_time': '根据数据量而定'
            },
            {
                'step': 5,
                'title': '性能监控',
                'description': '监控优化后的性能指标',
                'commands': [
                    '# 查看作业历史: http://resourcemanager:19888',
                    '# 监控集群资源: http://resourcemanager:8088'
                ],
                'estimated_time': '持续监控'
            }
        ]
        
        return steps
    
    def _estimate_improvements(self, job_profile: dict, strategies: List[str]) -> dict:
        """
        预估改进效果
        
        Args:
            job_profile: 作业特征
            strategies: 优化策略
            
        Returns:
            dict: 预估改进效果
        """
        improvements = {
            'execution_time': {'current': 0, 'optimized': 0, 'improvement': '0%'},
            'resource_utilization': {'current': 0, 'optimized': 0, 'improvement': '0%'},
            'data_locality': {'current': 0, 'optimized': 0, 'improvement': '0%'},
            'throughput': {'current': 0, 'optimized': 0, 'improvement': '0%'}
        }
        
        # 基于策略类型估算改进幅度
        total_improvement = 0
        
        if 'input_optimization' in strategies:
            total_improvement += 0.15  # 15%改进
        if 'map_optimization' in strategies:
            total_improvement += 0.20  # 20%改进
        if 'shuffle_optimization' in strategies:
            total_improvement += 0.25  # 25%改进
        if 'reduce_optimization' in strategies:
            total_improvement += 0.15  # 15%改进
        if 'resource_optimization' in strategies:
            total_improvement += 0.10  # 10%改进
        
        # 限制最大改进幅度
        total_improvement = min(total_improvement, 0.60)  # 最大60%改进
        
        # 模拟当前性能数据
        current_time = job_profile.get('current_execution_time', 1800)  # 30分钟
        current_utilization = job_profile.get('current_resource_utilization', 0.6)
        current_locality = job_profile.get('current_data_locality', 0.7)
        
        # 计算优化后的性能
        optimized_time = current_time * (1 - total_improvement)
        optimized_utilization = min(current_utilization * (1 + total_improvement * 0.5), 0.95)
        optimized_locality = min(current_locality * (1 + total_improvement * 0.3), 0.95)
        
        improvements.update({
            'execution_time': {
                'current': f'{current_time/60:.1f}分钟',
                'optimized': f'{optimized_time/60:.1f}分钟',
                'improvement': f'{total_improvement*100:.1f}%'
            },
            'resource_utilization': {
                'current': f'{current_utilization:.1%}',
                'optimized': f'{optimized_utilization:.1%}',
                'improvement': f'{(optimized_utilization-current_utilization)/current_utilization*100:.1f}%'
            },
            'data_locality': {
                'current': f'{current_locality:.1%}',
                'optimized': f'{optimized_locality:.1%}',
                'improvement': f'{(optimized_locality-current_locality)/current_locality*100:.1f}%'
            }
        })
        
        return improvements
    
    def generate_configuration_file(self, optimization_plan: dict) -> str:
        """
        生成优化配置文件
        
        Args:
            optimization_plan: 优化方案
            
        Returns:
            str: 配置文件内容
        """
        config_lines = [
            '<?xml version="1.0"?>',
            '<!-- MapReduce优化配置 -->',
            f'<!-- 生成时间: {time.strftime("%Y-%m-%d %H:%M:%S")} -->',
            '<configuration>',
            ''
        ]
        
        # 添加配置项
        for key, value in optimization_plan['configuration'].items():
            config_lines.extend([
                '  <property>',
                f'    <name>{key}</name>',
                f'    <value>{value}</value>',
                '  </property>',
                ''
            ])
        
        config_lines.append('</configuration>')
        
        return '\n'.join(config_lines)

# 使用示例
if __name__ == "__main__":
    # 创建优化器
    optimizer = MapReduceOptimizer()
    
    # 定义作业特征
    job_profile = {
        'data_size_gb': 500,
        'cluster_nodes': 10,
        'job_type': 'aggregation',
        'current_execution_time': 3600,  # 1小时
        'current_resource_utilization': 0.6,
        'current_data_locality': 0.7
    }
    
    # 生成优化方案
    optimization_plan = optimizer.generate_optimization_plan(job_profile)
    
    print("=== MapReduce优化方案 ===")
    print(f"推荐策略: {', '.join(optimization_plan['recommended_strategies'])}")
    print(f"配置参数数量: {len(optimization_plan['configuration'])}")
    print(f"实施步骤: {len(optimization_plan['implementation_steps'])}个")
    
    # 显示预期改进
    improvements = optimization_plan['expected_improvements']
    print("\n=== 预期改进效果 ===")
    for metric, data in improvements.items():
        if isinstance(data, dict) and 'improvement' in data:
            print(f"{metric}: {data['current']} -> {data['optimized']} (改进{data['improvement']})")
    
    # 生成配置文件
    config_content = optimizer.generate_configuration_file(optimization_plan)
    
    # 保存配置文件
    with open('mapred-site-optimized.xml', 'w', encoding='utf-8') as f:
        f.write(config_content)
    
    print("\n优化配置文件已生成: mapred-site-optimized.xml")

5. MapReduce最佳实践与总结

5.1 设计最佳实践

5.1.1 作业设计原则

  1. 数据本地性优化

    • 尽量让计算靠近数据
    • 合理设置输入分片大小
    • 避免跨机架数据传输
  2. 任务粒度控制

    • Map任务:每个任务处理128MB-1GB数据
    • Reduce任务:数量为集群核心数的0.95-1.75倍
    • 避免任务过多或过少
  3. 内存使用优化

    • 合理配置JVM堆内存
    • 避免频繁GC
    • 使用对象池减少内存分配

5.1.2 算法设计模式

class MapReduceDesignPatterns:
    """
    MapReduce设计模式集合
    """
    
    def __init__(self):
        self.patterns = {
            'filtering': self._filtering_pattern,
            'summarization': self._summarization_pattern,
            'structural': self._structural_pattern,
            'join': self._join_pattern,
            'metapatterns': self._metapatterns
        }
    
    def _filtering_pattern(self) -> dict:
        """
        过滤模式
        
        Returns:
            dict: 过滤模式说明
        """
        return {
            'name': '过滤模式 (Filtering Pattern)',
            'description': '从大数据集中筛选出满足条件的记录',
            'use_cases': [
                '数据清洗',
                '异常检测',
                '条件查询',
                '采样'
            ],
            'implementation': {
                'mapper': '检查每条记录是否满足过滤条件,满足则输出',
                'reducer': '通常不需要,或者简单的恒等函数',
                'combiner': '可以使用恒等函数作为Combiner'
            },
            'example': '''
# Mapper伪代码
def map(key, value):
    record = parse(value)
    if meets_criteria(record):
        emit(key, record)

# 通常不需要Reducer或使用恒等Reducer
def reduce(key, values):
    for value in values:
        emit(key, value)
''',
            'optimization_tips': [
                '在Mapper中尽早过滤,减少数据传输',
                '使用Combiner减少网络开销',
                '考虑使用布隆过滤器预过滤'
            ]
        }
    
    def _summarization_pattern(self) -> dict:
        """
        汇总模式
        
        Returns:
            dict: 汇总模式说明
        """
        return {
            'name': '汇总模式 (Summarization Pattern)',
            'description': '对数据进行分组和聚合计算',
            'use_cases': [
                '统计分析',
                '数据聚合',
                '报表生成',
                '指标计算'
            ],
            'implementation': {
                'mapper': '提取分组键和要聚合的值',
                'reducer': '对同一键的所有值进行聚合计算',
                'combiner': '可以使用与Reducer相同的逻辑'
            },
            'example': '''
# 词频统计示例
def map(key, value):
    words = value.split()
    for word in words:
        emit(word, 1)

def reduce(key, values):
    count = sum(values)
    emit(key, count)

# Combiner可以使用相同的reduce逻辑
def combine(key, values):
    count = sum(values)
    emit(key, count)
''',
            'optimization_tips': [
                '使用Combiner进行本地聚合',
                '选择合适的数据类型减少内存使用',
                '考虑使用增量聚合算法'
            ]
        }
    
    def _structural_pattern(self) -> dict:
        """
        结构化模式
        
        Returns:
            dict: 结构化模式说明
        """
        return {
            'name': '结构化模式 (Structural Pattern)',
            'description': '改变数据的结构或格式',
            'use_cases': [
                '数据转换',
                '格式转换',
                '数据重组',
                '索引构建'
            ],
            'implementation': {
                'mapper': '解析输入数据,转换为目标格式',
                'reducer': '可能需要进一步处理或简单输出',
                'partitioner': '可能需要自定义分区逻辑'
            },
            'example': '''
# 数据格式转换示例
def map(key, value):
    # 解析CSV格式
    fields = value.split(',')
    # 转换为JSON格式
    json_data = {
        'id': fields[0],
        'name': fields[1],
        'age': int(fields[2])
    }
    emit(fields[0], json.dumps(json_data))

def reduce(key, values):
    # 简单输出或进一步处理
    for value in values:
        emit(key, value)
''',
            'optimization_tips': [
                '在Mapper中进行数据验证',
                '使用高效的序列化格式',
                '考虑数据压缩'
            ]
        }
    
    def _join_pattern(self) -> dict:
        """
        连接模式
        
        Returns:
            dict: 连接模式说明
        """
        return {
            'name': '连接模式 (Join Pattern)',
            'description': '将多个数据集按照某个键进行连接',
            'use_cases': [
                '数据关联',
                '表连接',
                '数据融合',
                '关系查询'
            ],
            'types': {
                'reduce_side_join': '在Reduce阶段进行连接',
                'map_side_join': '在Map阶段进行连接(需要预排序)',
                'broadcast_join': '广播小表到所有Mapper'
            },
            'implementation': {
                'mapper': '标记数据来源,输出连接键',
                'reducer': '根据连接键合并来自不同数据源的记录',
                'partitioner': '确保相同键的数据到达同一Reducer'
            },
            'example': '''
# Reduce端连接示例
def map(key, value):
    # 标记数据来源
    if from_table_a(value):
        emit(join_key, ('A', value))
    else:
        emit(join_key, ('B', value))

def reduce(key, values):
    table_a_records = []
    table_b_records = []
    
    for source, record in values:
        if source == 'A':
            table_a_records.append(record)
        else:
            table_b_records.append(record)
    
    # 执行连接
    for a_record in table_a_records:
        for b_record in table_b_records:
            joined = join_records(a_record, b_record)
            emit(key, joined)
''',
            'optimization_tips': [
                '对于小表使用Map端连接',
                '使用二级排序优化Reduce端连接',
                '考虑数据倾斜问题'
            ]
        }
    
    def _metapatterns(self) -> dict:
        """
        元模式
        
        Returns:
            dict: 元模式说明
        """
        return {
            'name': '元模式 (Meta Patterns)',
            'description': '组合多个MapReduce作业的高级模式',
            'types': {
                'job_chaining': '作业链式执行',
                'job_merging': '作业合并',
                'job_partitioning': '作业分区'
            },
            'use_cases': [
                '复杂数据处理流水线',
                '多阶段算法实现',
                '工作流管理'
            ],
            'implementation': {
                'workflow_management': '使用Oozie、Azkaban等工具',
                'data_dependency': '管理作业间的数据依赖',
                'error_handling': '实现容错和重试机制'
            },
            'optimization_tips': [
                '减少中间数据的I/O',
                '合理规划作业依赖关系',
                '使用缓存机制'
            ]
        }
    
    def get_pattern_recommendation(self, problem_type: str, data_characteristics: dict) -> dict:
        """
        根据问题类型和数据特征推荐设计模式
        
        Args:
            problem_type: 问题类型
            data_characteristics: 数据特征
            
        Returns:
            dict: 推荐的设计模式
        """
        recommendations = {
            'primary_pattern': None,
            'secondary_patterns': [],
            'implementation_notes': [],
            'performance_considerations': []
        }
        
        # 根据问题类型推荐主要模式
        if problem_type in ['search', 'filter', 'clean']:
            recommendations['primary_pattern'] = 'filtering'
        elif problem_type in ['count', 'sum', 'aggregate', 'statistics']:
            recommendations['primary_pattern'] = 'summarization'
        elif problem_type in ['transform', 'convert', 'reformat']:
            recommendations['primary_pattern'] = 'structural'
        elif problem_type in ['join', 'merge', 'combine']:
            recommendations['primary_pattern'] = 'join'
        
        # 根据数据特征添加辅助模式
        data_size = data_characteristics.get('size_gb', 0)
        num_files = data_characteristics.get('num_files', 1)
        
        if data_size > 1000:  # 大数据集
            recommendations['secondary_patterns'].append('filtering')  # 预过滤
            recommendations['performance_considerations'].append('考虑数据压缩')
        
        if num_files > 10000:  # 小文件问题
            recommendations['implementation_notes'].append('使用CombineFileInputFormat')
        
        return recommendations
    
    def generate_pattern_guide(self, pattern_name: str) -> str:
        """
        生成设计模式指南
        
        Args:
            pattern_name: 模式名称
            
        Returns:
            str: 模式指南
        """
        if pattern_name not in self.patterns:
            return f"未知模式: {pattern_name}"
        
        pattern = self.patterns[pattern_name]()
        
        guide_lines = [
            f"# {pattern['name']}",
            "",
            f"## 描述",
            pattern['description'],
            "",
            f"## 适用场景"
        ]
        
        if 'use_cases' in pattern:
            for use_case in pattern['use_cases']:
                guide_lines.append(f"- {use_case}")
        
        if 'implementation' in pattern:
            guide_lines.extend([
                "",
                "## 实现要点",
                f"- **Mapper**: {pattern['implementation'].get('mapper', 'N/A')}",
                f"- **Reducer**: {pattern['implementation'].get('reducer', 'N/A')}",
                f"- **Combiner**: {pattern['implementation'].get('combiner', 'N/A')}"
            ])
        
        if 'example' in pattern:
            guide_lines.extend([
                "",
                "## 示例代码",
                "```python",
                pattern['example'].strip(),
                "```"
            ])
        
        if 'optimization_tips' in pattern:
            guide_lines.extend([
                "",
                "## 优化建议"
            ])
            for tip in pattern['optimization_tips']:
                guide_lines.append(f"- {tip}")
        
        return "\n".join(guide_lines)

# 使用示例
if __name__ == "__main__":
    patterns = MapReduceDesignPatterns()
    
    # 获取模式推荐
    recommendation = patterns.get_pattern_recommendation(
        problem_type='aggregate',
        data_characteristics={'size_gb': 500, 'num_files': 1000}
    )
    
    print("=== 设计模式推荐 ===")
    print(f"主要模式: {recommendation['primary_pattern']}")
    print(f"辅助模式: {recommendation['secondary_patterns']}")
    
    # 生成模式指南
    guide = patterns.generate_pattern_guide('summarization')
    print("\n=== 汇总模式指南 ===")
    print(guide)

5.2 运维最佳实践

5.2.1 性能监控

class MapReduceMonitoringBestPractices:
    """
    MapReduce监控最佳实践
    """
    
    def __init__(self):
        self.monitoring_metrics = {
            'job_level': self._job_level_metrics,
            'task_level': self._task_level_metrics,
            'cluster_level': self._cluster_level_metrics,
            'application_level': self._application_level_metrics
        }
        self.alerting_rules = {}
    
    def _job_level_metrics(self) -> List[dict]:
        """
        作业级别监控指标
        
        Returns:
            List[dict]: 监控指标列表
        """
        return [
            {
                'metric': 'job_execution_time',
                'description': '作业执行时间',
                'threshold': {
                    'warning': '> 2x baseline',
                    'critical': '> 5x baseline'
                },
                'collection_method': 'JobHistory API',
                'frequency': '每个作业完成后'
            },
            {
                'metric': 'job_success_rate',
                'description': '作业成功率',
                'threshold': {
                    'warning': '< 95%',
                    'critical': '< 90%'
                },
                'collection_method': 'JobHistory API',
                'frequency': '每小时统计'
            },
            {
                'metric': 'data_locality_ratio',
                'description': '数据本地性比例',
                'threshold': {
                    'warning': '< 70%',
                    'critical': '< 50%'
                },
                'collection_method': 'JobHistory API',
                'frequency': '每个作业完成后'
            },
            {
                'metric': 'shuffle_data_size',
                'description': 'Shuffle数据量',
                'threshold': {
                    'warning': '> 10GB per reduce task',
                    'critical': '> 50GB per reduce task'
                },
                'collection_method': 'JobHistory API',
                'frequency': '每个作业完成后'
            }
        ]
    
    def _task_level_metrics(self) -> List[dict]:
        """
        任务级别监控指标
        
        Returns:
            List[dict]: 监控指标列表
        """
        return [
            {
                'metric': 'task_execution_time',
                'description': '任务执行时间分布',
                'threshold': {
                    'warning': 'P95 > 2x median',
                    'critical': 'P99 > 5x median'
                },
                'collection_method': 'TaskTracker logs',
                'frequency': '实时'
            },
            {
                'metric': 'task_failure_rate',
                'description': '任务失败率',
                'threshold': {
                    'warning': '> 5%',
                    'critical': '> 10%'
                },
                'collection_method': 'TaskTracker logs',
                'frequency': '每5分钟'
            },
            {
                'metric': 'gc_time_percentage',
                'description': 'GC时间占比',
                'threshold': {
                    'warning': '> 10%',
                    'critical': '> 20%'
                },
                'collection_method': 'JVM metrics',
                'frequency': '实时'
            },
            {
                'metric': 'memory_usage',
                'description': '内存使用率',
                'threshold': {
                    'warning': '> 80%',
                    'critical': '> 95%'
                },
                'collection_method': 'JVM metrics',
                'frequency': '实时'
            }
        ]
    
    def _cluster_level_metrics(self) -> List[dict]:
        """
        集群级别监控指标
        
        Returns:
            List[dict]: 监控指标列表
        """
        return [
            {
                'metric': 'cluster_utilization',
                'description': '集群资源利用率',
                'threshold': {
                    'warning': '< 60% or > 90%',
                    'critical': '< 40% or > 95%'
                },
                'collection_method': 'ResourceManager API',
                'frequency': '每分钟'
            },
            {
                'metric': 'queue_wait_time',
                'description': '队列等待时间',
                'threshold': {
                    'warning': '> 5 minutes',
                    'critical': '> 15 minutes'
                },
                'collection_method': 'ResourceManager API',
                'frequency': '每分钟'
            },
            {
                'metric': 'node_health',
                'description': '节点健康状态',
                'threshold': {
                    'warning': '< 95% nodes healthy',
                    'critical': '< 90% nodes healthy'
                },
                'collection_method': 'NodeManager heartbeat',
                'frequency': '每30秒'
            }
        ]
    
    def _application_level_metrics(self) -> List[dict]:
        """
        应用级别监控指标
        
        Returns:
            List[dict]: 监控指标列表
        """
        return [
            {
                'metric': 'throughput',
                'description': '数据处理吞吐量',
                'threshold': {
                    'warning': '< 80% of expected',
                    'critical': '< 60% of expected'
                },
                'collection_method': 'Application metrics',
                'frequency': '每小时'
            },
            {
                'metric': 'error_rate',
                'description': '应用错误率',
                'threshold': {
                    'warning': '> 1%',
                    'critical': '> 5%'
                },
                'collection_method': 'Application logs',
                'frequency': '每5分钟'
            }
        ]
    
    def generate_monitoring_dashboard(self) -> dict:
        """
        生成监控仪表板配置
        
        Returns:
            dict: 仪表板配置
        """
        dashboard = {
            'title': 'MapReduce监控仪表板',
            'panels': [],
            'alerts': [],
            'refresh_interval': '30s'
        }
        
        # 添加监控面板
        for level, metrics_func in self.monitoring_metrics.items():
            metrics = metrics_func()
            panel = {
                'title': f'{level.replace("_", " ").title()} Metrics',
                'metrics': [m['metric'] for m in metrics],
                'type': 'graph',
                'targets': []
            }
            
            for metric in metrics:
                panel['targets'].append({
                    'metric': metric['metric'],
                    'description': metric['description'],
                    'collection_method': metric['collection_method']
                })
            
            dashboard['panels'].append(panel)
        
        return dashboard
    
    def generate_alerting_rules(self) -> List[dict]:
        """
        生成告警规则
        
        Returns:
            List[dict]: 告警规则列表
        """
        rules = []
        
        for level, metrics_func in self.monitoring_metrics.items():
            metrics = metrics_func()
            for metric in metrics:
                if 'threshold' in metric:
                    rule = {
                        'name': f"{metric['metric']}_alert",
                        'metric': metric['metric'],
                        'description': metric['description'],
                        'conditions': metric['threshold'],
                        'severity': {
                            'warning': 'medium',
                            'critical': 'high'
                        },
                        'notification_channels': [
                            'email',
                            'slack'
                        ]
                    }
                    rules.append(rule)
        
        return rules

# 使用示例
if __name__ == "__main__":
    monitoring = MapReduceMonitoringBestPractices()
    
    # 生成监控仪表板
    dashboard = monitoring.generate_monitoring_dashboard()
    print("=== 监控仪表板配置 ===")
    print(f"面板数量: {len(dashboard['panels'])}")
    
    # 生成告警规则
    alerts = monitoring.generate_alerting_rules()
    print(f"\n=== 告警规则 ===")
    print(f"规则数量: {len(alerts)}")
    
    for alert in alerts[:3]:  # 显示前3个规则
        print(f"- {alert['name']}: {alert['description']}")

5.2.2 故障排查

  1. 常见问题诊断

    • 作业执行缓慢
    • 任务频繁失败
    • 内存不足错误
    • 数据倾斜问题
  2. 日志分析

    • JobHistory日志
    • TaskTracker日志
    • ApplicationMaster日志
    • Container日志
  3. 性能调优

    • 参数调优
    • 代码优化
    • 资源配置
    • 网络优化

5.3 总结

5.3.1 MapReduce核心概念回顾

  1. 编程模型

    • Map阶段:数据转换和过滤
    • Reduce阶段:数据聚合和汇总
    • Shuffle阶段:数据重新分布
  2. 关键特性

    • 分布式计算
    • 容错性
    • 可扩展性
    • 数据本地性
  3. 适用场景

    • 批处理任务
    • 大数据分析
    • 日志处理
    • 数据转换

5.3.2 最佳实践要点

  1. 设计原则

    • 选择合适的设计模式
    • 优化数据本地性
    • 合理设置任务粒度
    • 使用Combiner减少网络传输
  2. 性能优化

    • 输入输出优化
    • 内存配置优化
    • Shuffle优化
    • JVM参数调优
  3. 运维管理

    • 全面监控
    • 及时告警
    • 故障快速定位
    • 性能持续优化

5.3.3 发展趋势

  1. 技术演进

    • 向实时计算发展(Spark、Flink)
    • 更好的资源管理(YARN、Kubernetes)
    • 更高级的抽象(SQL、DataFrame)
  2. 应用场景扩展

    • 机器学习
    • 图计算
    • 流处理
    • 交互式查询

MapReduce作为大数据处理的基础框架,虽然在某些场景下被更新的技术所替代,但其核心思想和设计原则仍然是分布式计算领域的重要基础。掌握MapReduce的原理和最佳实践,对于理解和使用其他大数据技术具有重要意义。 “`