1. 开发环境准备

1.1 IDE配置

<!-- Maven依赖配置 pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.3.4</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

1.2 项目结构

mapreduce-demo/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/example/
│   │   │       ├── wordcount/
│   │   │       ├── temperature/
│   │   │       └── utils/
│   │   └── resources/
│   └── test/
│       └── java/
├── input/
├── output/
└── pom.xml

2. 基础WordCount程序

2.1 Mapper类实现

package com.example.wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        
        // 转换为小写并分词
        String line = value.toString().toLowerCase();
        StringTokenizer tokenizer = new StringTokenizer(line);
        
        // 输出每个单词
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
}

2.2 Reducer类实现

package com.example.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        
        int sum = 0;
        // 累加相同单词的计数
        for (IntWritable value : values) {
            sum += value.get();
        }
        
        result.set(sum);
        context.write(key, result);
    }
}

2.3 Driver类实现

package com.example.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountDriver {
    
    public static void main(String[] args) throws Exception {
        
        if (args.length != 2) {
            System.err.println("Usage: WordCount <input path> <output path>");
            System.exit(-1);
        }
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        // 设置jar包
        job.setJarByClass(WordCountDriver.class);
        
        // 设置Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 设置输入输出格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        // 设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 等待作业完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3. 高级MapReduce程序

3.1 气温分析程序

自定义数据类型

package com.example.temperature;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TemperatureWritable implements WritableComparable<TemperatureWritable> {
    
    private int year;
    private int temperature;
    
    public TemperatureWritable() {}
    
    public TemperatureWritable(int year, int temperature) {
        this.year = year;
        this.temperature = temperature;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(temperature);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        year = in.readInt();
        temperature = in.readInt();
    }
    
    @Override
    public int compareTo(TemperatureWritable other) {
        int result = Integer.compare(this.year, other.year);
        if (result == 0) {
            result = Integer.compare(this.temperature, other.temperature);
        }
        return result;
    }
    
    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null || getClass() != obj.getClass()) return false;
        TemperatureWritable that = (TemperatureWritable) obj;
        return year == that.year && temperature == that.temperature;
    }
    
    @Override
    public int hashCode() {
        return 31 * year + temperature;
    }
    
    @Override
    public String toString() {
        return year + "\t" + temperature;
    }
    
    // Getters and Setters
    public int getYear() { return year; }
    public void setYear(int year) { this.year = year; }
    public int getTemperature() { return temperature; }
    public void setTemperature(int temperature) { this.temperature = temperature; }
}

气温Mapper

package com.example.temperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TemperatureMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    
    private static final int MISSING = 9999;
    
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        String year = line.substring(15, 19);
        
        int airTemperature;
        if (line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
        
        String quality = line.substring(92, 93);
        
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new IntWritable(Integer.parseInt(year)), 
                         new IntWritable(airTemperature));
        }
    }
}

气温Reducer

package com.example.temperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TemperatureReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
    
    @Override
    public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        
        context.write(key, new IntWritable(maxValue));
    }
}

3.2 自定义分区器

package com.example.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<Text, IntWritable> {
    
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        // 根据key的首字母分区
        char firstChar = key.toString().charAt(0);
        
        if (firstChar >= 'a' && firstChar <= 'f') {
            return 0 % numPartitions;
        } else if (firstChar >= 'g' && firstChar <= 'l') {
            return 1 % numPartitions;
        } else if (firstChar >= 'm' && firstChar <= 'r') {
            return 2 % numPartitions;
        } else {
            return 3 % numPartitions;
        }
    }
}

3.3 自定义排序

package com.example.sort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DescendingIntComparator extends WritableComparator {
    
    public DescendingIntComparator() {
        super(IntWritable.class, true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        IntWritable ia = (IntWritable) a;
        IntWritable ib = (IntWritable) b;
        
        // 降序排列
        return -1 * ia.compareTo(ib);
    }
}

4. 输入输出格式

4.1 自定义InputFormat

package com.example.input;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileInputFormat extends FileInputFormat<LongWritable, Text> {
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // 不分割文件
        return false;
    }
    
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new WholeFileRecordReader();
    }
}

4.2 自定义RecordReader

package com.example.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<LongWritable, Text> {
    
    private FileSplit fileSplit;
    private Configuration conf;
    private boolean processed = false;
    private LongWritable key = new LongWritable();
    private Text value = new Text();
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }
    
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
                key.set(0);
            } finally {
                IOUtils.closeStream(in);
            }
            
            processed = true;
            return true;
        }
        return false;
    }
    
    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }
    
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }
    
    @Override
    public void close() throws IOException {
        // 清理资源
    }
}

4.3 自定义OutputFormat

package com.example.output;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MultipleTextOutputFormat extends FileOutputFormat<Text, Text> {
    
    @Override
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {
        return new MultipleRecordWriter(context);
    }
}

5. 单元测试

5.1 Mapper测试

package com.example.wordcount;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;

public class WordCountMapperTest {
    
    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    
    @Before
    public void setUp() {
        WordCountMapper mapper = new WordCountMapper();
        mapDriver = MapDriver.newMapDriver(mapper);
    }
    
    @Test
    public void testMapper() throws IOException {
        mapDriver.withInput(new LongWritable(0), new Text("hello world hello"));
        mapDriver.withOutput(new Text("hello"), new IntWritable(1));
        mapDriver.withOutput(new Text("world"), new IntWritable(1));
        mapDriver.withOutput(new Text("hello"), new IntWritable(1));
        mapDriver.runTest();
    }
}

5.2 Reducer测试

package com.example.wordcount;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class WordCountReducerTest {
    
    private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    
    @Before
    public void setUp() {
        WordCountReducer reducer = new WordCountReducer();
        reduceDriver = ReduceDriver.newReduceDriver(reducer);
    }
    
    @Test
    public void testReducer() throws IOException {
        List<IntWritable> values = new ArrayList<>();
        values.add(new IntWritable(1));
        values.add(new IntWritable(1));
        values.add(new IntWritable(1));
        
        reduceDriver.withInput(new Text("hello"), values);
        reduceDriver.withOutput(new Text("hello"), new IntWritable(3));
        reduceDriver.runTest();
    }
}

5.3 完整作业测试

package com.example.wordcount;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class WordCountJobTest {
    
    private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
    
    @Before
    public void setUp() {
        WordCountMapper mapper = new WordCountMapper();
        WordCountReducer reducer = new WordCountReducer();
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    
    @Test
    public void testMapReduce() throws IOException {
        mapReduceDriver.withInput(new LongWritable(0), new Text("hello world"));
        mapReduceDriver.withInput(new LongWritable(1), new Text("hello hadoop"));
        
        mapReduceDriver.withOutput(new Text("hadoop"), new IntWritable(1));
        mapReduceDriver.withOutput(new Text("hello"), new IntWritable(2));
        mapReduceDriver.withOutput(new Text("world"), new IntWritable(1));
        
        mapReduceDriver.runTest();
    }
}

6. 编译和打包

6.1 Maven编译

# 编译项目
mvn clean compile

# 运行测试
mvn test

# 打包
mvn clean package

6.2 创建可执行JAR

<!-- 在pom.xml中添加插件 -->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.example.wordcount.WordCountDriver</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

7. 本地调试

7.1 本地模式运行

public class LocalWordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        // 设置为本地模式
        conf.set("mapreduce.framework.name", "local");
        conf.set("fs.defaultFS", "file:///");
        
        Job job = Job.getInstance(conf, "local word count");
        job.setJarByClass(LocalWordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output"));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

7.2 IDE调试配置

// 在IDE中设置程序参数
// Program arguments: input output
// VM options: -Dhadoop.home.dir=/path/to/hadoop

8. 性能优化技巧

8.1 使用Combiner

// 在Driver中设置Combiner
job.setCombinerClass(WordCountReducer.class);

8.2 压缩中间结果

// 启用Map输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", 
              SnappyCodec.class, CompressionCodec.class);

8.3 调整缓冲区大小

// 设置排序缓冲区大小
conf.setInt("mapreduce.task.io.sort.mb", 200);

// 设置溢写阈值
conf.setFloat("mapreduce.map.sort.spill.percent", 0.8f);

9. 常见问题和解决方案

9.1 内存溢出

// 增加JVM堆内存
conf.set("mapreduce.map.java.opts", "-Xmx1024m");
conf.set("mapreduce.reduce.java.opts", "-Xmx2048m");

9.2 数据倾斜

// 使用自定义分区器
job.setPartitionerClass(CustomPartitioner.class);

// 增加Reduce任务数量
job.setNumReduceTasks(10);

9.3 小文件问题

// 使用CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 67108864); // 64MB

小结

本章详细介绍了MapReduce程序的开发过程,包括基础程序编写、高级特性使用、单元测试、编译打包和性能优化。掌握这些技能是开发高质量MapReduce应用的基础。

下一章将介绍MapReduce的高级特性和优化技术。