3개의 파일로 구성되어 있는 예제의 소스에 설명을 넣어본다.
Mapper > MaxTemperatureMapper.java
Reducer > MaxTemperatureReducer.java
Main Class >>> MaxTemperature.java
https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all
------------------------------- begin of thepart of the file 1901 -----------------------------
0029227070999991901123020004+62167+030650FM
-12+010299999V0200701N002119999999N0000001N9
-01221+99999100831ADDGF108991999999999999999999
0029227070999991901123106004+62167+030650FM
-12+010299999V0200701N004119999999N0000001N9
-01391+99999100521ADDGF108991999999999999999999
0029227070999991901123113004+62167+030650FM
-12+010299999V0200701N003119999999N0000001N9
-01391+99999100321ADDGF108991999999999999999999
------------------------------- end of the part of the file 1901 -------------------------------
MaxTemperature.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0])); // the path of target files
FileOutputFormat.setOutputPath(job, new Path(args[1])); // the path of the output
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MaxTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); // after reading a value, change the type
String year = line.substring(15, 21); // after reading a line, get the date part
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
MaxTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
'Dev tips and tips' 카테고리의 다른 글
hadoop 완벽가이드 기상데이터 처리 (0) | 2013.07.05 |
---|---|
Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient (0) | 2013.06.25 |
Write failed: Broken pipe (0) | 2013.06.20 |