1. 开发环境准备
1.1 IDE配置
<!-- Maven依赖配置 pom.xml -->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
1.2 项目结构
mapreduce-demo/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/example/
│ │ │ ├── wordcount/
│ │ │ ├── temperature/
│ │ │ └── utils/
│ │ └── resources/
│ └── test/
│ └── java/
├── input/
├── output/
└── pom.xml
2. 基础WordCount程序
2.1 Mapper类实现
package com.example.wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 转换为小写并分词
String line = value.toString().toLowerCase();
StringTokenizer tokenizer = new StringTokenizer(line);
// 输出每个单词
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
2.2 Reducer类实现
package com.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// 累加相同单词的计数
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
2.3 Driver类实现
package com.example.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
// 设置jar包
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待作业完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3. 高级MapReduce程序
3.1 气温分析程序
自定义数据类型
package com.example.temperature;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class TemperatureWritable implements WritableComparable<TemperatureWritable> {
private int year;
private int temperature;
public TemperatureWritable() {}
public TemperatureWritable(int year, int temperature) {
this.year = year;
this.temperature = temperature;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(temperature);
}
@Override
public void readFields(DataInput in) throws IOException {
year = in.readInt();
temperature = in.readInt();
}
@Override
public int compareTo(TemperatureWritable other) {
int result = Integer.compare(this.year, other.year);
if (result == 0) {
result = Integer.compare(this.temperature, other.temperature);
}
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
TemperatureWritable that = (TemperatureWritable) obj;
return year == that.year && temperature == that.temperature;
}
@Override
public int hashCode() {
return 31 * year + temperature;
}
@Override
public String toString() {
return year + "\t" + temperature;
}
// Getters and Setters
public int getYear() { return year; }
public void setYear(int year) { this.year = year; }
public int getTemperature() { return temperature; }
public void setTemperature(int temperature) { this.temperature = temperature; }
}
气温Mapper
package com.example.temperature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TemperatureMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new IntWritable(Integer.parseInt(year)),
new IntWritable(airTemperature));
}
}
}
气温Reducer
package com.example.temperature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class TemperatureReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
3.2 自定义分区器
package com.example.partition;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// 根据key的首字母分区
char firstChar = key.toString().charAt(0);
if (firstChar >= 'a' && firstChar <= 'f') {
return 0 % numPartitions;
} else if (firstChar >= 'g' && firstChar <= 'l') {
return 1 % numPartitions;
} else if (firstChar >= 'm' && firstChar <= 'r') {
return 2 % numPartitions;
} else {
return 3 % numPartitions;
}
}
}
3.3 自定义排序
package com.example.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DescendingIntComparator extends WritableComparator {
public DescendingIntComparator() {
super(IntWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntWritable ia = (IntWritable) a;
IntWritable ib = (IntWritable) b;
// 降序排列
return -1 * ia.compareTo(ib);
}
}
4. 输入输出格式
4.1 自定义InputFormat
package com.example.input;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// 不分割文件
return false;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new WholeFileRecordReader();
}
}
4.2 自定义RecordReader
package com.example.input;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<LongWritable, Text> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
key.set(0);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// 清理资源
}
}
4.3 自定义OutputFormat
package com.example.output;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MultipleTextOutputFormat extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new MultipleRecordWriter(context);
}
}
5. 单元测试
5.1 Mapper测试
package com.example.wordcount;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;
public class WordCountMapperTest {
private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
@Before
public void setUp() {
WordCountMapper mapper = new WordCountMapper();
mapDriver = MapDriver.newMapDriver(mapper);
}
@Test
public void testMapper() throws IOException {
mapDriver.withInput(new LongWritable(0), new Text("hello world hello"));
mapDriver.withOutput(new Text("hello"), new IntWritable(1));
mapDriver.withOutput(new Text("world"), new IntWritable(1));
mapDriver.withOutput(new Text("hello"), new IntWritable(1));
mapDriver.runTest();
}
}
5.2 Reducer测试
package com.example.wordcount;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class WordCountReducerTest {
private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
@Before
public void setUp() {
WordCountReducer reducer = new WordCountReducer();
reduceDriver = ReduceDriver.newReduceDriver(reducer);
}
@Test
public void testReducer() throws IOException {
List<IntWritable> values = new ArrayList<>();
values.add(new IntWritable(1));
values.add(new IntWritable(1));
values.add(new IntWritable(1));
reduceDriver.withInput(new Text("hello"), values);
reduceDriver.withOutput(new Text("hello"), new IntWritable(3));
reduceDriver.runTest();
}
}
5.3 完整作业测试
package com.example.wordcount;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class WordCountJobTest {
private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
@Before
public void setUp() {
WordCountMapper mapper = new WordCountMapper();
WordCountReducer reducer = new WordCountReducer();
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}
@Test
public void testMapReduce() throws IOException {
mapReduceDriver.withInput(new LongWritable(0), new Text("hello world"));
mapReduceDriver.withInput(new LongWritable(1), new Text("hello hadoop"));
mapReduceDriver.withOutput(new Text("hadoop"), new IntWritable(1));
mapReduceDriver.withOutput(new Text("hello"), new IntWritable(2));
mapReduceDriver.withOutput(new Text("world"), new IntWritable(1));
mapReduceDriver.runTest();
}
}
6. 编译和打包
6.1 Maven编译
# 编译项目
mvn clean compile
# 运行测试
mvn test
# 打包
mvn clean package
6.2 创建可执行JAR
<!-- 在pom.xml中添加插件 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.wordcount.WordCountDriver</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
7. 本地调试
7.1 本地模式运行
public class LocalWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 设置为本地模式
conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "file:///");
Job job = Job.getInstance(conf, "local word count");
job.setJarByClass(LocalWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
7.2 IDE调试配置
// 在IDE中设置程序参数
// Program arguments: input output
// VM options: -Dhadoop.home.dir=/path/to/hadoop
8. 性能优化技巧
8.1 使用Combiner
// 在Driver中设置Combiner
job.setCombinerClass(WordCountReducer.class);
8.2 压缩中间结果
// 启用Map输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
SnappyCodec.class, CompressionCodec.class);
8.3 调整缓冲区大小
// 设置排序缓冲区大小
conf.setInt("mapreduce.task.io.sort.mb", 200);
// 设置溢写阈值
conf.setFloat("mapreduce.map.sort.spill.percent", 0.8f);
9. 常见问题和解决方案
9.1 内存溢出
// 增加JVM堆内存
conf.set("mapreduce.map.java.opts", "-Xmx1024m");
conf.set("mapreduce.reduce.java.opts", "-Xmx2048m");
9.2 数据倾斜
// 使用自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
// 增加Reduce任务数量
job.setNumReduceTasks(10);
9.3 小文件问题
// 使用CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 67108864); // 64MB
小结
本章详细介绍了MapReduce程序的开发过程,包括基础程序编写、高级特性使用、单元测试、编译打包和性能优化。掌握这些技能是开发高质量MapReduce应用的基础。
下一章将介绍MapReduce的高级特性和优化技术。