1. 高级数据处理模式

1.1 二次排序(Secondary Sort)

复合键设计

package com.example.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class CompositeKey implements WritableComparable<CompositeKey> {
    
    private String naturalKey;  // 自然键
    private Integer secondaryKey;  // 二级键
    
    public CompositeKey() {}
    
    public CompositeKey(String naturalKey, Integer secondaryKey) {
        this.naturalKey = naturalKey;
        this.secondaryKey = secondaryKey;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(naturalKey);
        out.writeInt(secondaryKey);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        naturalKey = in.readUTF();
        secondaryKey = in.readInt();
    }
    
    @Override
    public int compareTo(CompositeKey other) {
        int result = this.naturalKey.compareTo(other.naturalKey);
        if (result == 0) {
            result = this.secondaryKey.compareTo(other.secondaryKey);
        }
        return result;
    }
    
    // Getters and Setters
    public String getNaturalKey() { return naturalKey; }
    public void setNaturalKey(String naturalKey) { this.naturalKey = naturalKey; }
    public Integer getSecondaryKey() { return secondaryKey; }
    public void setSecondaryKey(Integer secondaryKey) { this.secondaryKey = secondaryKey; }
}

自然键分组比较器

package com.example.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class NaturalKeyGroupingComparator extends WritableComparator {
    
    public NaturalKeyGroupingComparator() {
        super(CompositeKey.class, true);
    }
    
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        CompositeKey key1 = (CompositeKey) w1;
        CompositeKey key2 = (CompositeKey) w2;
        
        // 只比较自然键,用于分组
        return key1.getNaturalKey().compareTo(key2.getNaturalKey());
    }
}

自然键分区器

package com.example.secondarysort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class NaturalKeyPartitioner extends Partitioner<CompositeKey, Text> {
    
    @Override
    public int getPartition(CompositeKey key, Text value, int numPartitions) {
        // 根据自然键分区
        return (key.getNaturalKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

1.2 数据连接(Join)操作

Map端连接

package com.example.join;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    private Map<String, String> lookupTable = new HashMap<>();
    private String fileName;
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取文件名
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        fileName = fileSplit.getPath().getName();
        
        // 加载小表到内存
        if (fileName.equals("small_table.txt")) {
            // 加载查找表逻辑
            loadLookupTable(context);
        }
    }
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        if (fileName.equals("large_table.txt")) {
            String[] fields = value.toString().split("\t");
            String joinKey = fields[0];
            
            // 从查找表中获取匹配数据
            String lookupValue = lookupTable.get(joinKey);
            if (lookupValue != null) {
                context.write(new Text(joinKey), 
                             new Text(value.toString() + "\t" + lookupValue));
            }
        }
    }
    
    private void loadLookupTable(Context context) {
        // 实现查找表加载逻辑
    }
}

Reduce端连接

package com.example.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceSideJoinReducer extends Reducer<Text, Text, Text, Text> {
    
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        
        List<String> leftTable = new ArrayList<>();
        List<String> rightTable = new ArrayList<>();
        
        // 分离来自不同表的数据
        for (Text value : values) {
            String record = value.toString();
            if (record.startsWith("L:")) {
                leftTable.add(record.substring(2));
            } else if (record.startsWith("R:")) {
                rightTable.add(record.substring(2));
            }
        }
        
        // 执行笛卡尔积连接
        for (String leftRecord : leftTable) {
            for (String rightRecord : rightTable) {
                context.write(key, new Text(leftRecord + "\t" + rightRecord));
            }
        }
    }
}

1.3 数据去重

package com.example.dedup;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

// Mapper:输出需要去重的记录
public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(value, NullWritable.get());
    }
}

// Reducer:每个key只输出一次
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    
    @Override
    public void reduce(Text key, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException {
        // 只输出一次,实现去重
        context.write(key, NullWritable.get());
    }
}

2. 性能优化策略

2.1 数据本地性优化

package com.example.locality;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class LocalityOptimization {
    
    public static void configureDataLocality(Job job) {
        Configuration conf = job.getConfiguration();
        
        // 启用机架感知
        conf.set("net.topology.script.file.name", "/path/to/rack-topology.sh");
        
        // 设置本地性级别
        conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.8f);
        
        // 延迟调度
        conf.setLong("mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob", 10);
    }
}

2.2 内存管理优化

package com.example.memory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class MemoryOptimization {
    
    public static void configureMemory(Job job) {
        Configuration conf = job.getConfiguration();
        
        // Map任务内存配置
        conf.setInt("mapreduce.map.memory.mb", 2048);
        conf.set("mapreduce.map.java.opts", "-Xmx1638m -XX:+UseG1GC");
        
        // Reduce任务内存配置
        conf.setInt("mapreduce.reduce.memory.mb", 4096);
        conf.set("mapreduce.reduce.java.opts", "-Xmx3276m -XX:+UseG1GC");
        
        // 排序缓冲区配置
        conf.setInt("mapreduce.task.io.sort.mb", 400);
        conf.setFloat("mapreduce.map.sort.spill.percent", 0.8f);
        
        // Reduce端缓冲区配置
        conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent", 0.7f);
        conf.setFloat("mapreduce.reduce.shuffle.merge.percent", 0.66f);
    }
}

2.3 压缩优化

package com.example.compression;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;

public class CompressionOptimization {
    
    public static void configureCompression(Job job) {
        Configuration conf = job.getConfiguration();
        
        // 输入压缩
        conf.setBoolean("mapreduce.input.fileinputformat.split.bzip2", true);
        
        // Map输出压缩
        conf.setBoolean("mapreduce.map.output.compress", true);
        conf.setClass("mapreduce.map.output.compress.codec", 
                     SnappyCodec.class, CompressionCodec.class);
        
        // 最终输出压缩
        conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
        conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
                     GzipCodec.class, CompressionCodec.class);
        
        // 序列文件压缩
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
    }
}

2.4 推测执行优化

package com.example.speculation;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class SpeculativeExecution {
    
    public static void configureSpeculation(Job job) {
        Configuration conf = job.getConfiguration();
        
        // 启用推测执行
        conf.setBoolean("mapreduce.map.speculative", true);
        conf.setBoolean("mapreduce.reduce.speculative", true);
        
        // 推测执行阈值
        conf.setFloat("mapreduce.job.speculative.slowtaskthreshold", 1.0f);
        conf.setInt("mapreduce.job.speculative.speculativecap", 10);
        
        // 推测执行时间间隔
        conf.setLong("mapreduce.job.speculative.retry-after-no-speculate", 1000);
        conf.setLong("mapreduce.job.speculative.retry-after-speculate", 15000);
    }
}

3. 高级输入输出处理

3.1 多路径输入

package com.example.multiinput;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class MultipleInputExample {
    
    public static void configureMultipleInputs(Job job) {
        // 为不同路径配置不同的Mapper
        MultipleInputs.addInputPath(job, new Path("/input/sales"), 
                                   TextInputFormat.class, SalesMapper.class);
        MultipleInputs.addInputPath(job, new Path("/input/customers"), 
                                   TextInputFormat.class, CustomerMapper.class);
        MultipleInputs.addInputPath(job, new Path("/input/products"), 
                                   TextInputFormat.class, ProductMapper.class);
    }
}

3.2 多路径输出

package com.example.multioutput;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultipleOutputReducer extends Reducer<Text, Text, Text, Text> {
    
    private MultipleOutputs<Text, Text> multipleOutputs;
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<>(context);
    }
    
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        
        for (Text value : values) {
            String record = value.toString();
            
            // 根据条件输出到不同文件
            if (record.contains("ERROR")) {
                multipleOutputs.write("errors", key, value);
            } else if (record.contains("WARN")) {
                multipleOutputs.write("warnings", key, value);
            } else {
                multipleOutputs.write("info", key, value);
            }
        }
    }
    
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

3.3 自定义计数器

package com.example.counters;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

public class CounterMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    // 定义计数器枚举
    public enum ProcessingCounters {
        TOTAL_RECORDS,
        VALID_RECORDS,
        INVALID_RECORDS,
        EMPTY_LINES
    }
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        // 获取计数器
        Counter totalRecords = context.getCounter(ProcessingCounters.TOTAL_RECORDS);
        Counter validRecords = context.getCounter(ProcessingCounters.VALID_RECORDS);
        Counter invalidRecords = context.getCounter(ProcessingCounters.INVALID_RECORDS);
        Counter emptyLines = context.getCounter(ProcessingCounters.EMPTY_LINES);
        
        totalRecords.increment(1);
        
        String line = value.toString().trim();
        
        if (line.isEmpty()) {
            emptyLines.increment(1);
            return;
        }
        
        if (isValidRecord(line)) {
            validRecords.increment(1);
            context.write(new Text("valid"), value);
        } else {
            invalidRecords.increment(1);
            context.write(new Text("invalid"), value);
        }
    }
    
    private boolean isValidRecord(String line) {
        // 验证记录的逻辑
        return line.split("\t").length >= 3;
    }
}

4. 链式作业和工作流

4.1 作业链

package com.example.chain;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ChainJobExample {
    
    public static boolean runChainJob(String inputPath, String outputPath) 
            throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "chain job");
        
        job.setJarByClass(ChainJobExample.class);
        
        // 配置Mapper链
        Configuration map1Conf = new Configuration(false);
        ChainMapper.addMapper(job, FilterMapper.class, 
                             LongWritable.class, Text.class,
                             Text.class, Text.class, map1Conf);
        
        Configuration map2Conf = new Configuration(false);
        ChainMapper.addMapper(job, TransformMapper.class,
                             Text.class, Text.class,
                             Text.class, Text.class, map2Conf);
        
        // 配置Reducer链
        Configuration reduceConf = new Configuration(false);
        ChainReducer.setReducer(job, AggregateReducer.class,
                               Text.class, Text.class,
                               Text.class, Text.class, reduceConf);
        
        Configuration map3Conf = new Configuration(false);
        ChainReducer.addMapper(job, FinalMapper.class,
                              Text.class, Text.class,
                              Text.class, Text.class, map3Conf);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        
        return job.waitForCompletion(true);
    }
}

4.2 作业依赖管理

package com.example.workflow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;

public class WorkflowManager {
    
    public static void runWorkflow() throws Exception {
        Configuration conf = new Configuration();
        
        // 第一个作业:数据清洗
        Job cleanJob = createCleanJob(conf, "/input/raw", "/temp/clean");
        if (!cleanJob.waitForCompletion(true)) {
            System.exit(1);
        }
        
        // 第二个作业:数据转换
        Job transformJob = createTransformJob(conf, "/temp/clean", "/temp/transform");
        if (!transformJob.waitForCompletion(true)) {
            System.exit(1);
        }
        
        // 第三个作业:数据聚合
        Job aggregateJob = createAggregateJob(conf, "/temp/transform", "/output/final");
        if (!aggregateJob.waitForCompletion(true)) {
            System.exit(1);
        }
        
        // 清理临时目录
        cleanupTempDirectories(conf);
    }
    
    private static Job createCleanJob(Configuration conf, String input, String output) 
            throws Exception {
        Job job = Job.getInstance(conf, "data cleaning");
        // 配置作业...
        return job;
    }
    
    private static Job createTransformJob(Configuration conf, String input, String output) 
            throws Exception {
        Job job = Job.getInstance(conf, "data transformation");
        // 配置作业...
        return job;
    }
    
    private static Job createAggregateJob(Configuration conf, String input, String output) 
            throws Exception {
        Job job = Job.getInstance(conf, "data aggregation");
        // 配置作业...
        return job;
    }
    
    private static void cleanupTempDirectories(Configuration conf) {
        // 清理临时目录的逻辑
    }
}

5. 错误处理和容错

5.1 任务级错误处理

package com.example.errorhandling;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ErrorHandlingMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        try {
            // 处理数据的逻辑
            String processedData = processData(value.toString());
            context.write(new Text("success"), new Text(processedData));
            
        } catch (DataProcessingException e) {
            // 记录错误但不中断任务
            context.getCounter("Errors", "DataProcessingError").increment(1);
            context.write(new Text("error"), new Text(e.getMessage()));
            
        } catch (Exception e) {
            // 严重错误,记录并跳过
            context.getCounter("Errors", "UnknownError").increment(1);
            System.err.println("Error processing record: " + value.toString());
            e.printStackTrace();
        }
    }
    
    private String processData(String data) throws DataProcessingException {
        // 数据处理逻辑
        if (data == null || data.trim().isEmpty()) {
            throw new DataProcessingException("Empty data");
        }
        return data.toUpperCase();
    }
}

5.2 作业级错误处理

package com.example.errorhandling;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;

public class JobErrorHandling {
    
    public static void configureErrorHandling(Job job) {
        Configuration conf = job.getConfiguration();
        
        // 设置最大任务失败次数
        conf.setInt("mapreduce.map.maxattempts", 3);
        conf.setInt("mapreduce.reduce.maxattempts", 3);
        
        // 设置作业失败阈值
        conf.setFloat("mapreduce.map.failures.maxpercent", 5.0f);
        conf.setFloat("mapreduce.reduce.failures.maxpercent", 5.0f);
        
        // 跳过坏记录
        conf.setBoolean("mapreduce.map.skip.mode", true);
        conf.setLong("mapreduce.map.skip.maxrecords", 100);
        
        // 启用任务超时
        conf.setLong("mapreduce.task.timeout", 600000); // 10分钟
    }
}

6. 监控和调试

6.1 作业监控

package com.example.monitoring;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;

public class JobMonitor {
    
    public static void monitorJob(Job job) throws Exception {
        while (!job.isComplete()) {
            Thread.sleep(10000); // 10秒检查一次
            
            // 获取作业进度
            float mapProgress = job.mapProgress();
            float reduceProgress = job.reduceProgress();
            
            System.out.printf("Map: %.2f%%, Reduce: %.2f%%\n", 
                            mapProgress * 100, reduceProgress * 100);
            
            // 检查失败任务
            TaskReport[] mapReports = job.getTaskReports(TaskType.MAP);
            TaskReport[] reduceReports = job.getTaskReports(TaskType.REDUCE);
            
            checkFailedTasks(mapReports, "MAP");
            checkFailedTasks(reduceReports, "REDUCE");
        }
        
        // 作业完成后的统计
        printJobStatistics(job);
    }
    
    private static void checkFailedTasks(TaskReport[] reports, String taskType) {
        for (TaskReport report : reports) {
            if (report.getCurrentStatus() == TaskReport.State.FAILED) {
                System.err.printf("%s task failed: %s\n", 
                                taskType, report.getTaskId());
            }
        }
    }
    
    private static void printJobStatistics(Job job) throws Exception {
        System.out.println("\n=== Job Statistics ===");
        System.out.println("Job ID: " + job.getJobID());
        System.out.println("Job Name: " + job.getJobName());
        System.out.println("Job State: " + job.getJobState());
        
        // 打印计数器
        job.getCounters().forEach(group -> {
            System.out.println("\nGroup: " + group.getDisplayName());
            group.forEach(counter -> {
                System.out.printf("  %s: %d\n", 
                                counter.getDisplayName(), counter.getValue());
            });
        });
    }
}

6.2 性能分析

package com.example.profiling;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ProfilingMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    private long startTime;
    private long recordCount = 0;
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        startTime = System.currentTimeMillis();
    }
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        long recordStartTime = System.nanoTime();
        
        // 处理记录
        processRecord(value, context);
        
        long recordEndTime = System.nanoTime();
        recordCount++;
        
        // 记录处理时间
        if (recordCount % 10000 == 0) {
            long avgTime = (recordEndTime - recordStartTime) / 1000000; // ms
            System.out.printf("Processed %d records, avg time: %d ms\n", 
                            recordCount, avgTime);
        }
    }
    
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        long endTime = System.currentTimeMillis();
        long totalTime = endTime - startTime;
        
        System.out.printf("Mapper completed: %d records in %d ms\n", 
                        recordCount, totalTime);
        
        // 更新自定义计数器
        context.getCounter("Performance", "TotalRecords").setValue(recordCount);
        context.getCounter("Performance", "TotalTimeMs").setValue(totalTime);
    }
    
    private void processRecord(Text value, Context context) 
            throws IOException, InterruptedException {
        // 实际的记录处理逻辑
        context.write(new Text("processed"), value);
    }
}

7. 最佳实践总结

7.1 设计原则

  • 单一职责:每个Map/Reduce函数只做一件事
  • 无状态设计:避免在Map/Reduce函数中维护状态
  • 数据本地性:尽量让计算靠近数据
  • 容错设计:考虑任务失败和重试场景

7.2 性能优化

  • 合理设置任务数量:避免任务过多或过少
  • 使用Combiner:减少网络传输
  • 启用压缩:减少I/O开销
  • 调优内存配置:避免OOM和GC压力

7.3 运维建议

  • 监控关键指标:任务进度、失败率、资源使用
  • 日志分析:及时发现和解决问题
  • 性能测试:在生产环境前进行充分测试
  • 版本管理:维护代码和配置的版本控制

小结

本章深入介绍了MapReduce的高级特性和优化技术,包括复杂数据处理模式、性能优化策略、错误处理机制和监控调试方法。掌握这些高级技术对于开发高性能、可靠的MapReduce应用至关重要。

下一章将介绍MapReduce在实际项目中的应用案例和最佳实践。