항공 파일 맵리듀스 작업
[jacob@h001 hadoop]$
./bin/hadoop jar air003.jar airDepartureDelayCount /user/jacob/in/ /user/jacob/out/test01
airDepartureDelayCount .java
package com.jacob.mr.parallel.airdelay;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
// util
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class airDepartureDelayCount extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if(args.length != 2) {
System.err.println("Usage : com.jacob.mr.parallel.airdelay.airDepartureDelayCount <input> <output> ");
System.exit(2);
}
String arg0 = args[0];
System.out.println("arg0 : " + arg0);
String arg1 = args[1];
System.out.println("arg1 : " + arg1);
Job job = new Job(getConf(), "airDepartureDelayCount");
System.out.println("job : " + job);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(airDepartureDelayCount.class);
job.setMapperClass(AirMapper.class);
//job.setPartitionerClass(cls);
//job.setCombinerClass(cls);
job.setReducerClass(AirReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
return(0);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new airDepartureDelayCount() , args);
System.out.println("## RESULT : " + res);
}
}
AirMapper.java
package com.jacob.mr.parallel.airdelay;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
public class AirMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
// Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
private final static IntWritable mapValueOut = new IntWritable(1);
private Text mapKeyOut = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] columns = value.toString().split(",");
if(columns != null && columns.length > 0 ) {
if(key.get() > 0){
String[] cols = value.toString().split(",");
if(cols != null && cols.length > 0) {
try{
int arrDelayTime = Integer.parseInt(cols[14]);
int depDelayTime = Integer.parseInt(cols[15]);
//#########################################
if(!cols[15].equals("NA")){
mapKeyOut.set("YMUC---" + cols[0] + "," + cols[1] + "," + cols[8]); // combination key Year, Month, UniqueCarrier
if( arrDelayTime + depDelayTime > 0 && arrDelayTime + depDelayTime <=5 ) {
mapValueOut.set(depDelayTime);
context.write(mapKeyOut, mapValueOut);
context.getCounter(JCounters.YMUCdelay0and5).increment(1);
} else if (arrDelayTime + depDelayTime > 5 && arrDelayTime + depDelayTime <=15){
context.getCounter(JCounters.YMUCdelay5and15).increment(1);
} else if (arrDelayTime + depDelayTime > 15){
context.getCounter(JCounters.YMUCdelayover15).increment(1);
}
} else if(arrDelayTime + depDelayTime <= 0){
context.getCounter(JCounters.YMUCnodelay).increment(1);
}
//#########################################
if(!cols[14].equals("NA")){
mapKeyOut.set("YUCM---" + cols[0] + "," + cols[8] + "," + cols[1]); // combination key Year, Month, UniqueCarrier
if( arrDelayTime + depDelayTime > 0 && arrDelayTime + depDelayTime <=5 ) {
mapValueOut.set(depDelayTime);
context.write(mapKeyOut, mapValueOut);
context.getCounter(JCounters.YUCMdelay0and5).increment(1);
} else if (arrDelayTime + depDelayTime > 5 && arrDelayTime + depDelayTime <=15){
context.getCounter(JCounters.YUCMdelay5and15).increment(1);
} else if (arrDelayTime + depDelayTime > 15){
context.getCounter(JCounters.YUCMdelayover15).increment(1);
}
} else if(arrDelayTime + depDelayTime <= 0){
context.getCounter(JCounters.YUCMnodelay).increment(1);
}
//#########################################
}catch (Exception e) {e.printStackTrace();}
} // if(cols != null && cols.length > 0)
} // if(key.get() > 0)
} // if(columns != null && columns.length > 0 )
} // void map
}
AirReducer.java
package com.jacob.mr.parallel.airdelay;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class AirReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
// Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private MultipleOutputs<Text, IntWritable> mos;
private IntWritable reduceResult = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum = 0 ;
for(IntWritable value : values)
sum += value.get();
reduceResult.set(sum);
context.write(key, reduceResult);
}
}