1. 经典案例分析

1.1 日志分析系统

需求分析

  • 处理TB级别的Web服务器日志
  • 统计访问量、错误率、热门页面
  • 生成实时报表和趋势分析

数据格式

# Apache访问日志格式
192.168.1.100 - - [10/Oct/2023:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 2326 "http://www.example.com/" "Mozilla/5.0"

实现方案

package com.example.loganalysis;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

// 日志解析Mapper
public class LogAnalysisMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    private static final Pattern LOG_PATTERN = Pattern.compile(
        "^(\\S+) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) \\S+\" (\\d{3}) (\\d+)");
    
    private static final SimpleDateFormat dateFormat = 
        new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
    
    private final IntWritable one = new IntWritable(1);
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        Matcher matcher = LOG_PATTERN.matcher(line);
        
        if (matcher.matches()) {
            String ip = matcher.group(1);
            String timestamp = matcher.group(2);
            String method = matcher.group(3);
            String url = matcher.group(4);
            String statusCode = matcher.group(5);
            String responseSize = matcher.group(6);
            
            try {
                Date date = dateFormat.parse(timestamp);
                String hour = new SimpleDateFormat("yyyy-MM-dd-HH").format(date);
                
                // 按小时统计PV
                context.write(new Text("pv_" + hour), one);
                
                // 统计状态码分布
                context.write(new Text("status_" + statusCode), one);
                
                // 统计热门页面
                context.write(new Text("url_" + url), one);
                
                // 统计独立IP
                context.write(new Text("ip_" + ip), one);
                
                // 统计响应大小
                context.write(new Text("size_total"), new IntWritable(Integer.parseInt(responseSize)));
                
            } catch (ParseException e) {
                context.getCounter("Errors", "ParseError").increment(1);
            }
        } else {
            context.getCounter("Errors", "InvalidFormat").increment(1);
        }
    }
}

// 统计Reducer
public class LogAnalysisReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        
        context.write(key, new IntWritable(sum));
    }
}

1.2 推荐系统数据处理

协同过滤算法实现

package com.example.recommendation;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

// 用户-物品评分矩阵处理
public class UserItemMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        // 输入格式:userId,itemId,rating,timestamp
        String[] fields = value.toString().split(",");
        if (fields.length >= 3) {
            String userId = fields[0];
            String itemId = fields[1];
            String rating = fields[2];
            
            // 输出用户的所有评分
            context.write(new Text(userId), new Text(itemId + ":" + rating));
        }
    }
}

// 计算用户相似度
public class UserSimilarityReducer extends Reducer<Text, Text, Text, DoubleWritable> {
    
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        
        Map<String, Double> userRatings = new HashMap<>();
        
        // 收集用户的所有评分
        for (Text value : values) {
            String[] parts = value.toString().split(":");
            if (parts.length == 2) {
                userRatings.put(parts[0], Double.parseDouble(parts[1]));
            }
        }
        
        // 计算与其他用户的相似度(这里简化处理)
        // 实际应用中需要更复杂的相似度计算算法
        for (Map.Entry<String, Double> entry : userRatings.entrySet()) {
            context.write(new Text(key + "_" + entry.getKey()), 
                         new DoubleWritable(entry.getValue()));
        }
    }
}

1.3 金融风控数据分析

异常交易检测

package com.example.riskcontrol;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

// 交易数据分析Mapper
public class RiskAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        // 输入格式:transactionId,userId,amount,timestamp,location,deviceId
        String[] fields = value.toString().split(",");
        if (fields.length >= 6) {
            String transactionId = fields[0];
            String userId = fields[1];
            double amount = Double.parseDouble(fields[2]);
            String timestamp = fields[3];
            String location = fields[4];
            String deviceId = fields[5];
            
            // 按用户分组,用于分析用户行为模式
            String transactionInfo = String.format("%s,%f,%s,%s,%s", 
                transactionId, amount, timestamp, location, deviceId);
            
            context.write(new Text(userId), new Text(transactionInfo));
        }
    }
}

// 风险评估Reducer
public class RiskAnalysisReducer extends Reducer<Text, Text, Text, BooleanWritable> {
    
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        
        List<Transaction> transactions = new ArrayList<>();
        
        // 解析用户的所有交易
        for (Text value : values) {
            String[] parts = value.toString().split(",");
            if (parts.length >= 5) {
                Transaction tx = new Transaction(
                    parts[0], // transactionId
                    Double.parseDouble(parts[1]), // amount
                    Long.parseLong(parts[2]), // timestamp
                    parts[3], // location
                    parts[4]  // deviceId
                );
                transactions.add(tx);
            }
        }
        
        // 风险评估逻辑
        for (Transaction tx : transactions) {
            boolean isRisky = assessRisk(tx, transactions);
            context.write(new Text(tx.getTransactionId()), new BooleanWritable(isRisky));
        }
    }
    
    private boolean assessRisk(Transaction current, List<Transaction> allTransactions) {
        // 实现风险评估算法
        // 1. 异常金额检测
        double avgAmount = allTransactions.stream()
            .mapToDouble(Transaction::getAmount)
            .average().orElse(0.0);
        
        if (current.getAmount() > avgAmount * 5) {
            return true; // 金额异常
        }
        
        // 2. 频率异常检测
        long recentTransactions = allTransactions.stream()
            .filter(tx -> Math.abs(tx.getTimestamp() - current.getTimestamp()) < 3600000) // 1小时内
            .count();
        
        if (recentTransactions > 10) {
            return true; // 频率异常
        }
        
        // 3. 地理位置异常检测
        // 简化实现,实际需要更复杂的地理位置分析
        
        return false;
    }
}

// 交易数据类
class Transaction {
    private String transactionId;
    private double amount;
    private long timestamp;
    private String location;
    private String deviceId;
    
    public Transaction(String transactionId, double amount, long timestamp, 
                      String location, String deviceId) {
        this.transactionId = transactionId;
        this.amount = amount;
        this.timestamp = timestamp;
        this.location = location;
        this.deviceId = deviceId;
    }
    
    // Getters
    public String getTransactionId() { return transactionId; }
    public double getAmount() { return amount; }
    public long getTimestamp() { return timestamp; }
    public String getLocation() { return location; }
    public String getDeviceId() { return deviceId; }
}

2. 性能调优实战

2.1 大数据量处理优化

数据倾斜解决方案

package com.example.optimization;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

// 解决数据倾斜的Mapper
public class SkewHandlingMapper extends Mapper<Object, Text, Text, IntWritable> {
    
    private Random random = new Random();
    private final IntWritable one = new IntWritable(1);
    
    @Override
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String[] fields = value.toString().split("\t");
        if (fields.length >= 2) {
            String originalKey = fields[0];
            
            // 对热点key添加随机前缀
            if (isHotKey(originalKey)) {
                int randomPrefix = random.nextInt(10); // 0-9随机前缀
                String newKey = randomPrefix + "_" + originalKey;
                context.write(new Text(newKey), one);
            } else {
                context.write(new Text(originalKey), one);
            }
        }
    }
    
    private boolean isHotKey(String key) {
        // 判断是否为热点key的逻辑
        // 可以基于历史统计或实时监控
        return key.startsWith("hot_");
    }
}

// 第二阶段:合并随机前缀的结果
public class SkewHandlingCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        
        // 移除随机前缀
        String originalKey = key.toString();
        if (originalKey.matches("\\d+_.*")) {
            originalKey = originalKey.substring(2);
        }
        
        context.write(new Text(originalKey), new IntWritable(sum));
    }
}

内存优化配置

package com.example.optimization;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class MemoryOptimizationConfig {
    
    public static void configureForLargeDataset(Job job) {
        Configuration conf = job.getConfiguration();
        
        // 增加Map任务内存
        conf.setInt("mapreduce.map.memory.mb", 4096);
        conf.set("mapreduce.map.java.opts", "-Xmx3276m -XX:+UseG1GC -XX:MaxGCPauseMillis=200");
        
        // 增加Reduce任务内存
        conf.setInt("mapreduce.reduce.memory.mb", 8192);
        conf.set("mapreduce.reduce.java.opts", "-Xmx6553m -XX:+UseG1GC -XX:MaxGCPauseMillis=200");
        
        // 优化排序缓冲区
        conf.setInt("mapreduce.task.io.sort.mb", 800);
        conf.setFloat("mapreduce.map.sort.spill.percent", 0.9f);
        
        // 增加并行度
        conf.setInt("mapreduce.job.reduces", 50);
        
        // 启用压缩
        conf.setBoolean("mapreduce.map.output.compress", true);
        conf.setClass("mapreduce.map.output.compress.codec", 
                     org.apache.hadoop.io.compress.SnappyCodec.class, 
                     org.apache.hadoop.io.compress.CompressionCodec.class);
    }
}

2.2 网络优化

减少Shuffle数据量

package com.example.optimization;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// 使用Combiner减少网络传输
public class OptimizedCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        
        int sum = 0;
        int count = 0;
        
        for (IntWritable value : values) {
            sum += value.get();
            count++;
        }
        
        // 只输出聚合后的结果,减少数据传输
        if (count > 1) {
            context.write(key, new IntWritable(sum));
        } else {
            context.write(key, new IntWritable(sum));
        }
    }
}

// 本地聚合优化
public class LocalAggregationMapper extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
    
    private Map<String, Integer> localCache = new HashMap<>();
    private static final int CACHE_SIZE = 10000;
    
    @Override
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String word = value.toString().trim();
        
        // 本地聚合
        localCache.put(word, localCache.getOrDefault(word, 0) + 1);
        
        // 当缓存达到一定大小时,输出并清空
        if (localCache.size() >= CACHE_SIZE) {
            flushCache(context);
        }
    }
    
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        flushCache(context);
    }
    
    private void flushCache(Context context) throws IOException, InterruptedException {
        for (Map.Entry<String, Integer> entry : localCache.entrySet()) {
            context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
        }
        localCache.clear();
    }
}

3. 监控与运维最佳实践

3.1 作业监控系统

package com.example.monitoring;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;

public class JobMonitoringSystem {
    
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
    
    public void startMonitoring(Job job) {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                monitorJobProgress(job);
                checkJobHealth(job);
                collectMetrics(job);
            } catch (Exception e) {
                System.err.println("Monitoring error: " + e.getMessage());
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private void monitorJobProgress(Job job) throws Exception {
        if (!job.isComplete()) {
            float mapProgress = job.mapProgress();
            float reduceProgress = job.reduceProgress();
            
            System.out.printf("[%s] Map: %.1f%%, Reduce: %.1f%%\n", 
                            job.getJobName(), mapProgress * 100, reduceProgress * 100);
            
            // 检查进度是否停滞
            checkProgressStagnation(job, mapProgress, reduceProgress);
        }
    }
    
    private void checkJobHealth(Job job) throws Exception {
        JobStatus status = job.getStatus();
        
        // 检查失败任务数量
        int failedMaps = status.getNumFailedMaps();
        int failedReduces = status.getNumFailedReduces();
        
        if (failedMaps > 5 || failedReduces > 2) {
            System.err.printf("WARNING: High failure rate - Maps: %d, Reduces: %d\n", 
                            failedMaps, failedReduces);
            // 发送告警
            sendAlert(job, "High task failure rate detected");
        }
    }
    
    private void collectMetrics(Job job) throws Exception {
        // 收集性能指标
        job.getCounters().forEach(group -> {
            group.forEach(counter -> {
                String metricName = group.getName() + "." + counter.getName();
                long value = counter.getValue();
                // 发送到监控系统(如Prometheus、InfluxDB等)
                sendMetric(metricName, value);
            });
        });
    }
    
    private void checkProgressStagnation(Job job, float mapProgress, float reduceProgress) {
        // 实现进度停滞检测逻辑
    }
    
    private void sendAlert(Job job, String message) {
        // 实现告警发送逻辑(邮件、短信、Slack等)
        System.err.println("ALERT: " + message + " for job " + job.getJobName());
    }
    
    private void sendMetric(String name, long value) {
        // 发送指标到监控系统
        System.out.printf("Metric: %s = %d\n", name, value);
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

3.2 自动化运维脚本

#!/bin/bash
# MapReduce作业自动化运维脚本

# 配置参数
JOB_JAR="/path/to/your/job.jar"
INPUT_PATH="/input/data"
OUTPUT_PATH="/output/results"
LOG_DIR="/var/log/mapreduce"
MAX_RETRIES=3
TIMEOUT=3600  # 1小时超时

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_DIR/job.log"
}

# 检查HDFS路径
check_hdfs_path() {
    local path=$1
    if ! hdfs dfs -test -e "$path"; then
        log "ERROR: HDFS path $path does not exist"
        return 1
    fi
    return 0
}

# 清理输出目录
cleanup_output() {
    if hdfs dfs -test -e "$OUTPUT_PATH"; then
        log "Cleaning up existing output directory: $OUTPUT_PATH"
        hdfs dfs -rm -r "$OUTPUT_PATH"
    fi
}

# 提交作业
submit_job() {
    local attempt=$1
    log "Submitting job (attempt $attempt/$MAX_RETRIES)"
    
    # 设置作业配置
    export HADOOP_OPTS="-Xmx2g"
    
    # 提交作业并获取Job ID
    local job_output=$(hadoop jar "$JOB_JAR" \
        -D mapreduce.job.name="DataProcessing-$(date +%Y%m%d-%H%M%S)" \
        -D mapreduce.map.memory.mb=2048 \
        -D mapreduce.reduce.memory.mb=4096 \
        -D mapreduce.job.reduces=10 \
        "$INPUT_PATH" "$OUTPUT_PATH" 2>&1)
    
    local job_id=$(echo "$job_output" | grep -o 'job_[0-9]*_[0-9]*' | head -1)
    
    if [ -z "$job_id" ]; then
        log "ERROR: Failed to extract job ID from output"
        log "Job output: $job_output"
        return 1
    fi
    
    log "Job submitted with ID: $job_id"
    
    # 监控作业进度
    monitor_job "$job_id"
    return $?
}

# 监控作业
monitor_job() {
    local job_id=$1
    local start_time=$(date +%s)
    
    log "Monitoring job: $job_id"
    
    while true; do
        local current_time=$(date +%s)
        local elapsed=$((current_time - start_time))
        
        # 检查超时
        if [ $elapsed -gt $TIMEOUT ]; then
            log "ERROR: Job timeout after $TIMEOUT seconds"
            yarn application -kill "$job_id"
            return 1
        fi
        
        # 获取作业状态
        local job_status=$(yarn application -status "$job_id" 2>/dev/null | grep "Final-State" | awk '{print $3}')
        
        case "$job_status" in
            "SUCCEEDED")
                log "Job completed successfully"
                return 0
                ;;
            "FAILED"|"KILLED")
                log "ERROR: Job failed with status: $job_status"
                return 1
                ;;
            "RUNNING"|"ACCEPTED")
                # 获取进度信息
                local progress=$(yarn application -status "$job_id" 2>/dev/null | grep "Progress" | awk '{print $3}')
                log "Job progress: $progress (elapsed: ${elapsed}s)"
                ;;
        esac
        
        sleep 30
    done
}

# 验证结果
validate_results() {
    log "Validating job results"
    
    # 检查输出目录是否存在
    if ! check_hdfs_path "$OUTPUT_PATH"; then
        log "ERROR: Output directory not found"
        return 1
    fi
    
    # 检查输出文件大小
    local output_size=$(hdfs dfs -du -s "$OUTPUT_PATH" | awk '{print $1}')
    if [ "$output_size" -eq 0 ]; then
        log "ERROR: Output directory is empty"
        return 1
    fi
    
    log "Job results validated successfully (output size: $output_size bytes)"
    return 0
}

# 主函数
main() {
    log "Starting MapReduce job automation"
    
    # 检查输入路径
    if ! check_hdfs_path "$INPUT_PATH"; then
        exit 1
    fi
    
    # 清理输出目录
    cleanup_output
    
    # 重试机制
    for attempt in $(seq 1 $MAX_RETRIES); do
        if submit_job "$attempt"; then
            if validate_results; then
                log "Job completed successfully"
                exit 0
            else
                log "Job validation failed"
            fi
        else
            log "Job attempt $attempt failed"
        fi
        
        if [ $attempt -lt $MAX_RETRIES ]; then
            log "Retrying in 60 seconds..."
            sleep 60
            cleanup_output
        fi
    done
    
    log "ERROR: All job attempts failed"
    exit 1
}

# 执行主函数
main "$@"

4. 企业级部署架构

4.1 多环境管理

# 环境配置文件 (environments.yml)
production:
  hadoop:
    namenode: "hdfs://prod-namenode:9000"
    resourcemanager: "prod-rm:8032"
  mapreduce:
    map_memory: 4096
    reduce_memory: 8192
    job_reduces: 50
  monitoring:
    enabled: true
    metrics_endpoint: "http://prometheus:9090"

staging:
  hadoop:
    namenode: "hdfs://staging-namenode:9000"
    resourcemanager: "staging-rm:8032"
  mapreduce:
    map_memory: 2048
    reduce_memory: 4096
    job_reduces: 20
  monitoring:
    enabled: true
    metrics_endpoint: "http://staging-prometheus:9090"

development:
  hadoop:
    namenode: "hdfs://dev-namenode:9000"
    resourcemanager: "dev-rm:8032"
  mapreduce:
    map_memory: 1024
    reduce_memory: 2048
    job_reduces: 5
  monitoring:
    enabled: false

4.2 CI/CD集成

# .gitlab-ci.yml
stages:
  - build
  - test
  - deploy-staging
  - deploy-production

variables:
  MAVEN_OPTS: "-Dmaven.repo.local=.m2/repository"
  HADOOP_VERSION: "3.3.4"

build:
  stage: build
  image: maven:3.8-openjdk-11
  script:
    - mvn clean compile
    - mvn package -DskipTests
  artifacts:
    paths:
      - target/*.jar
    expire_in: 1 hour
  cache:
    paths:
      - .m2/repository/

unit-test:
  stage: test
  image: maven:3.8-openjdk-11
  script:
    - mvn test
  artifacts:
    reports:
      junit:
        - target/surefire-reports/TEST-*.xml

integration-test:
  stage: test
  image: hadoop:$HADOOP_VERSION
  script:
    - ./scripts/setup-test-cluster.sh
    - ./scripts/run-integration-tests.sh
  artifacts:
    reports:
      junit:
        - target/failsafe-reports/TEST-*.xml

deploy-staging:
  stage: deploy-staging
  image: hadoop:$HADOOP_VERSION
  script:
    - ./scripts/deploy.sh staging
  environment:
    name: staging
    url: http://staging-hadoop.company.com
  only:
    - develop

deploy-production:
  stage: deploy-production
  image: hadoop:$HADOOP_VERSION
  script:
    - ./scripts/deploy.sh production
  environment:
    name: production
    url: http://hadoop.company.com
  when: manual
  only:
    - master

5. 故障排查指南

5.1 常见问题诊断

package com.example.troubleshooting;

import java.util.Arrays;
import java.util.List;

public class TroubleshootingGuide {
    
    public static class DiagnosticInfo {
        private String issue;
        private List<String> symptoms;
        private List<String> solutions;
        
        public DiagnosticInfo(String issue, List<String> symptoms, List<String> solutions) {
            this.issue = issue;
            this.symptoms = symptoms;
            this.solutions = solutions;
        }
        
        // Getters
        public String getIssue() { return issue; }
        public List<String> getSymptoms() { return symptoms; }
        public List<String> getSolutions() { return solutions; }
    }
    
    public static final List<DiagnosticInfo> COMMON_ISSUES = Arrays.asList(
        new DiagnosticInfo(
            "内存不足 (OutOfMemoryError)",
            Arrays.asList(
                "任务频繁失败并重启",
                "日志中出现 java.lang.OutOfMemoryError",
                "GC时间过长"
            ),
            Arrays.asList(
                "增加 mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb",
                "调整 mapreduce.map.java.opts 和 mapreduce.reduce.java.opts",
                "启用压缩减少内存使用",
                "使用更高效的数据结构"
            )
        ),
        
        new DiagnosticInfo(
            "数据倾斜",
            Arrays.asList(
                "某些Reduce任务运行时间远超其他任务",
                "集群资源利用不均",
                "作业长时间卡在99%进度"
            ),
            Arrays.asList(
                "使用随机前缀打散热点key",
                "实现自定义Partitioner",
                "增加Reduce任务数量",
                "使用Combiner预聚合"
            )
        ),
        
        new DiagnosticInfo(
            "网络连接问题",
            Arrays.asList(
                "任务超时失败",
                "Shuffle阶段耗时过长",
                "连接被拒绝错误"
            ),
            Arrays.asList(
                "检查网络配置和防火墙设置",
                "增加 mapreduce.task.timeout 值",
                "优化网络带宽和延迟",
                "检查DNS解析"
            )
        )
    );
    
    public static void printTroubleshootingGuide() {
        System.out.println("=== MapReduce故障排查指南 ===");
        
        for (DiagnosticInfo info : COMMON_ISSUES) {
            System.out.println("\n问题: " + info.getIssue());
            
            System.out.println("症状:");
            for (String symptom : info.getSymptoms()) {
                System.out.println("  - " + symptom);
            }
            
            System.out.println("解决方案:");
            for (String solution : info.getSolutions()) {
                System.out.println("  - " + solution);
            }
        }
    }
}

5.2 性能分析工具

#!/bin/bash
# MapReduce性能分析脚本

analyze_job_performance() {
    local job_id=$1
    
    echo "=== 分析作业性能: $job_id ==="
    
    # 获取作业基本信息
    echo "\n--- 作业基本信息 ---"
    yarn application -status "$job_id"
    
    # 分析任务执行时间
    echo "\n--- 任务执行时间分析 ---"
    mapred job -history "$job_id" | grep -E "(Map|Reduce).*Time"
    
    # 检查失败任务
    echo "\n--- 失败任务分析 ---"
    mapred job -list-attempt-ids "$job_id" map failed
    mapred job -list-attempt-ids "$job_id" reduce failed
    
    # 分析计数器
    echo "\n--- 关键计数器 ---"
    mapred job -counter "$job_id" "org.apache.hadoop.mapreduce.FileSystemCounter" "FILE_BYTES_READ"
    mapred job -counter "$job_id" "org.apache.hadoop.mapreduce.FileSystemCounter" "FILE_BYTES_WRITTEN"
    mapred job -counter "$job_id" "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_READ"
    mapred job -counter "$job_id" "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_WRITTEN"
    
    # 生成性能报告
    generate_performance_report "$job_id"
}

generate_performance_report() {
    local job_id=$1
    local report_file="performance_report_${job_id}.html"
    
    cat > "$report_file" << EOF
<!DOCTYPE html>
<html>
<head>
    <title>MapReduce性能报告 - $job_id</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .metric { margin: 10px 0; padding: 10px; border-left: 4px solid #007cba; }
        .warning { border-left-color: #ff9800; }
        .error { border-left-color: #f44336; }
    </style>
</head>
<body>
    <h1>MapReduce性能报告</h1>
    <h2>作业ID: $job_id</h2>
    
    <div class="metric">
        <h3>执行时间分析</h3>
        <p>总执行时间: $(get_job_duration "$job_id")</p>
        <p>Map阶段时间: $(get_map_duration "$job_id")</p>
        <p>Reduce阶段时间: $(get_reduce_duration "$job_id")</p>
    </div>
    
    <div class="metric">
        <h3>资源使用情况</h3>
        <p>Map任务数: $(get_map_task_count "$job_id")</p>
        <p>Reduce任务数: $(get_reduce_task_count "$job_id")</p>
        <p>失败任务数: $(get_failed_task_count "$job_id")</p>
    </div>
    
    <div class="metric">
        <h3>数据处理量</h3>
        <p>输入数据量: $(get_input_size "$job_id")</p>
        <p>输出数据量: $(get_output_size "$job_id")</p>
        <p>Shuffle数据量: $(get_shuffle_size "$job_id")</p>
    </div>
    
    <div class="metric">
        <h3>性能建议</h3>
        $(generate_recommendations "$job_id")
    </div>
</body>
</html>
EOF
    
    echo "性能报告已生成: $report_file"
}

# 辅助函数
get_job_duration() {
    # 实现获取作业执行时间的逻辑
    echo "待实现"
}

get_map_duration() {
    # 实现获取Map阶段时间的逻辑
    echo "待实现"
}

get_reduce_duration() {
    # 实现获取Reduce阶段时间的逻辑
    echo "待实现"
}

generate_recommendations() {
    local job_id=$1
    echo "<ul>"
    echo "<li>建议增加Map任务内存配置</li>"
    echo "<li>考虑启用中间结果压缩</li>"
    echo "<li>优化数据本地性</li>"
    echo "</ul>"
}

6. 总结与展望

6.1 MapReduce最佳实践总结

  1. 设计原则

    • 遵循函数式编程思想
    • 保持Map和Reduce函数的纯净性
    • 合理设计数据分区策略
  2. 性能优化

    • 充分利用Combiner减少网络传输
    • 启用压缩降低I/O开销
    • 合理配置内存和并行度
    • 解决数据倾斜问题
  3. 运维管理

    • 建立完善的监控体系
    • 实施自动化部署和运维
    • 制定故障应急预案
  4. 代码质量

    • 编写单元测试和集成测试
    • 使用版本控制管理代码
    • 遵循编码规范和最佳实践

6.2 技术发展趋势

  1. 向Spark迁移

    • Spark提供更好的性能和易用性
    • 支持流处理和机器学习
    • 更丰富的API和生态系统
  2. 云原生化

    • 容器化部署
    • Kubernetes编排
    • 弹性伸缩能力
  3. 实时处理

    • 流批一体化处理
    • 低延迟数据处理
    • 事件驱动架构

6.3 学习建议

  1. 深入理解原理

    • 掌握MapReduce的核心思想
    • 理解分布式计算的挑战和解决方案
  2. 实践项目经验

    • 参与实际的大数据项目
    • 解决真实的业务问题
  3. 关注技术发展

    • 学习新的大数据技术
    • 了解行业最佳实践
  4. 培养系统思维

    • 从架构角度思考问题
    • 考虑性能、可靠性、可维护性

小结

本章通过实际案例展示了MapReduce在不同领域的应用,介绍了性能优化、监控运维、故障排查等实战技能。这些知识和经验对于在生产环境中成功运用MapReduce技术至关重要。

随着大数据技术的不断发展,虽然MapReduce不再是唯一选择,但其核心思想和设计原则仍然具有重要的指导意义。掌握MapReduce有助于更好地理解和使用其他大数据处理框架。