본문 바로가기

Dev tips and tips

hadoop 완벽가이드 기상데이터 source 읽기

3개의 파일로 구성되어 있는 예제의 소스에 설명을 넣어본다. 


Mapper >  MaxTemperatureMapper.java 

Reducer > MaxTemperatureReducer.java 

Main Class  >>>  MaxTemperature.java 



https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all

------------------------------- begin  of thepart of the file 1901 ----------------------------- 

0029227070999991901123020004+62167+030650FM

-12+010299999V0200701N002119999999N0000001N9

-01221+99999100831ADDGF108991999999999999999999


0029227070999991901123106004+62167+030650FM

-12+010299999V0200701N004119999999N0000001N9

-01391+99999100521ADDGF108991999999999999999999


0029227070999991901123113004+62167+030650FM

-12+010299999V0200701N003119999999N0000001N9

-01391+99999100321ADDGF108991999999999999999999

------------------------------- end  of the part of the file 1901 -------------------------------



MaxTemperature.java 

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MaxTemperature {


  public static void main(String[] args) throws Exception {

    if (args.length != 2) {

      System.err.println("Usage: MaxTemperature <input path> <output path>");

      System.exit(-1);

    }

    

    Job job = new Job();

    job.setJarByClass(MaxTemperature.class);

    job.setJobName("Max temperature");


    FileInputFormat.addInputPath(job, new Path(args[0]));           // the path of target files

    FileOutputFormat.setOutputPath(job, new Path(args[1]));       // the path of the output

    

    job.setMapperClass(MaxTemperatureMapper.class);        

    job.setReducerClass(MaxTemperatureReducer.class);     


    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}



MaxTemperatureMapper.java 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


public class MaxTemperatureMapper  extends Mapper<LongWritable, Text, Text, IntWritable> {


  private static final int MISSING = 9999;

  

  @Override

  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    

    String line = value.toString();                                             // after reading a value, change the type

    String year = line.substring(15, 21);                                    // after reading a line, get the date part

    int airTemperature;

    if (line.charAt(87) == '+') {                                                // parseInt doesn't like leading plus signs

      airTemperature = Integer.parseInt(line.substring(88, 92));

    } else {

      airTemperature = Integer.parseInt(line.substring(87, 92));

    }

    String quality = line.substring(92, 93);

    if (airTemperature != MISSING && quality.matches("[01459]")) {

      context.write(new Text(year), new IntWritable(airTemperature));

    }

  }

}



MaxTemperatureReducer.java 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;


public class MaxTemperatureReducer  extends Reducer<Text, IntWritable, Text, IntWritable> {

  

  @Override

  public void reduce(Text key, Iterable<IntWritable> values, Context context)

                                                                       throws IOException, InterruptedException {

    

    int maxValue = Integer.MIN_VALUE;

    for (IntWritable value : values) {

      maxValue = Math.max(maxValue, value.get());

    }

    context.write(key, new IntWritable(maxValue));

  }

}