본문 바로가기

카테고리 없음

항공 파일 맵리듀스 작업


항공 파일 맵리듀스 작업



air001.jar


air002.jar


air003.jar



[jacob@h001 hadoop]$

 ./bin/hadoop jar air003.jar airDepartureDelayCount /user/jacob/in/ /user/jacob/out/test01



airDepartureDelayCount .java


package com.jacob.mr.parallel.airdelay;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


// util 

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class airDepartureDelayCount extends Configured implements Tool {

public int run(String[] args) throws Exception {

String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        if(args.length != 2) {

System.err.println("Usage : com.jacob.mr.parallel.airdelay.airDepartureDelayCount  <input>  <output> ");

System.exit(2);

}

String arg0 = args[0];

System.out.println("arg0 : " + arg0);

String arg1 = args[1];

System.out.println("arg1 : " + arg1);

Job job = new Job(getConf(), "airDepartureDelayCount");

System.out.println("job : " + job);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

job.setJarByClass(airDepartureDelayCount.class);

job.setMapperClass(AirMapper.class);

//job.setPartitionerClass(cls);

//job.setCombinerClass(cls);

job.setReducerClass(AirReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.waitForCompletion(true);

return(0);

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(),  new airDepartureDelayCount() , args);

System.out.println("## RESULT :  " + res);

}

}





AirMapper.java


package com.jacob.mr.parallel.airdelay;


import java.io.IOException;

import java.io.InterruptedIOException;


import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Mapper;


public class AirMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 

                       // Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

private final static IntWritable mapValueOut = new IntWritable(1);

private Text mapKeyOut = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

   String[] columns = value.toString().split(",");

   if(columns != null && columns.length > 0 ) {

   

   if(key.get() > 0){

String[] cols = value.toString().split(",");

if(cols != null && cols.length > 0) {

try{

int arrDelayTime = Integer.parseInt(cols[14]);

int depDelayTime = Integer.parseInt(cols[15]);

 //#########################################

if(!cols[15].equals("NA")){

mapKeyOut.set("YMUC---" + cols[0] + "," + cols[1] + "," + cols[8]); // combination key Year, Month, UniqueCarrier

if( arrDelayTime + depDelayTime > 0 && arrDelayTime + depDelayTime <=5 ) {

   mapValueOut.set(depDelayTime);

  context.write(mapKeyOut,  mapValueOut);

  context.getCounter(JCounters.YMUCdelay0and5).increment(1);

  

} else if (arrDelayTime + depDelayTime > 5 && arrDelayTime + depDelayTime <=15){

  context.getCounter(JCounters.YMUCdelay5and15).increment(1);

  

} else if (arrDelayTime + depDelayTime > 15){

  context.getCounter(JCounters.YMUCdelayover15).increment(1);

      }

} else if(arrDelayTime + depDelayTime <= 0){

context.getCounter(JCounters.YMUCnodelay).increment(1);  

}

     //#########################################

if(!cols[14].equals("NA")){

mapKeyOut.set("YUCM---" + cols[0] + "," + cols[8] + "," + cols[1]); // combination key Year, Month, UniqueCarrier

if( arrDelayTime + depDelayTime > 0 && arrDelayTime + depDelayTime <=5 ) {

   mapValueOut.set(depDelayTime);

  context.write(mapKeyOut,  mapValueOut);

  context.getCounter(JCounters.YUCMdelay0and5).increment(1);

  

} else if (arrDelayTime + depDelayTime > 5 && arrDelayTime + depDelayTime <=15){

  context.getCounter(JCounters.YUCMdelay5and15).increment(1);

  

} else if (arrDelayTime + depDelayTime > 15){

  context.getCounter(JCounters.YUCMdelayover15).increment(1);

      }

} else if(arrDelayTime + depDelayTime <= 0){

context.getCounter(JCounters.YUCMnodelay).increment(1);  

}


   //#########################################

}catch (Exception e) {e.printStackTrace();}

 } // if(cols != null && cols.length > 0)

} // if(key.get() > 0)

} // if(columns != null && columns.length > 0 )

} // void map

   

}




AirReducer.java


package com.jacob.mr.parallel.airdelay;


import java.io.IOException;

import java.util.StringTokenizer;

import java.util.Vector;


import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;


public class AirReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

                             // Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

private MultipleOutputs<Text, IntWritable> mos;

     private IntWritable reduceResult = new IntWritable();

     

     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

     

    int sum = 0 ;

    for(IntWritable value : values)

    sum += value.get();

    reduceResult.set(sum);

     

    context.write(key, reduceResult);

     

     }

}