Hadoop - MapReducer

BigData 2016. 8. 2. 11:51

# MapReducer


1. 개념

- Map : 데이터를 key - value 형태로 변경 시킨다

- -- -- -- 정렬 과 병합 -- -- -- : 하둡 플랫폼에서 수행한다

- Reducer : key - value 형태의 데이터를 가공한다

- ※ 기본적으로 **오름차순 정렬**

- ※ 개발자는 MapReducer 만 고민하면 된다, read / write 는 플랫폼에서 지원한다


2. 아키텍쳐 ( Hadoop 2.0 이후 버전 )

       

- JobTrcker : 마스터 노드, ResourceManager 역할로 슬레이브 노드 중 여유있는 노드를 찾아 Job(MapReduce 동작) 을 전달하고 이를 관리함

- TaskTracker : 슬레이브 노드, AppMaster 역할로 실제 Job 을 Map Task 와 Reducer Task 로 나누어 각각의 노드(슬레이브 노드)에 전달한다

- 하둡 1.x 에서는 마스터 노드에서 모두 관리하여 마스터 노드에 부담이 큰 아키텍쳐었다.


3. 동작 원리

       

- Map 작업이 완료 되어야 Reducer 작업이 진행된다

- but Reducer 작업은 Map 의 결과를 옮기는 것부터 시작임으로 로그 상에서 Map task 가 100%을 수행되지 않아도 올라갈 수 있다

- 하둡 플랫폼에 의해 (**사용자 아님**) 데이터의 양을 판단, 입력 스프릿 수를 결정하여 각각 매퍼를 생성

- in JAVA, 리듀서는 default 1개

- 중복되는 task 가 진행 될 경우 하나를 kill 한다

- 투기적 매커니즘 에 의해 긴 응답시간이 걸릴 때 중복 작업을 지시한다 > 먼저 나오는 task 를 제외하고 kill 한다

4. 동작

- 마스터     

[hadoop@hadoop01 input]$ jps      

7847 org.eclipse.equinox.launcher_1.3.0.v20140415-2008.jar          

11356 Jps      

11295 RunJar      

3830 ResourceManager      

5561 JobHistoryServer     

3514 NameNode       

3696 SecondaryNameNode        

- 슬레이브        

[hadoop@hadoop04 ~]$ jps        

5574 YarnChild      

2489 NodeManager          

4883 MRAppMaster (*) : slave 에서 mapreduce 동작시, taskManager 역할을 하게 된다      

5568 YarnChild : (*) 실제 mapreduce 동작하는 프로세스       

5565 YarnChild       

5735 Jps      

5564 YarnChild       

2393 DataNode       

5567 YarnChild      

5566 YarnChild       

- 동작 로그         

Starting Job = job_1432788061280_0003, Tracking URL = http://hadoop01:8088/proxy/application_1432788061280_0003/         

Kill Command = /home/hadoop/hadoop/bin/hadoop job  -kill job_1432788061280_0003

Hadoop job information for Stage-1: number of mappers: 4; number of reducers: 1 >> 매퍼 4개 리듀서 1개 확인            

2015-05-28 16:13:29,860 Stage-1 map = 0%,  reduce = 0%            

2015-05-28 16:13:50,172 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 15.48 sec         

2015-05-28 16:13:55,362 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.67 sec             

MapReduce Total cumulative CPU time: 16 seconds 670 msec         

Ended Job = job_1432788061280_0003      

MapReduce Jobs Launched:      

Job 0: Map: 4  Reduce: 1   Cumulative CPU: 16.67 sec   HDFS Read: 992186218 HDFS Write: 5 SUCCESS       

Total MapReduce CPU Time Spent: 16 seconds 670 msec       

OK        

6412       

Time taken: 43.959 seconds, Fetched: 1 row(s)        

5. 튜닝 구간

- 셔플 : Map task 와 Reduce task 사이의 전달 구간

- 튜닝 포인트

- Map task 는 메모리 버퍼를 생성한 후 출력데이터를 버퍼에 기록 후 일정 크기에 도달하면 로컬 디스크로 쓴다 (파일 I/O 발생)

- 로컬 디스크 로 쓰여진 spill 파일들을 정렬된 출력 파일로 병합한다 (thread 추가 포인트)

    - 실행 시 추가할 옵션

    - io.sort.mb: 100mb --> 200mb

    - io.sort.factor: 10 --> 30

    - mapred.child.java.opts=-Xmx512m   #JAVA HEAP MEMORY


6. MapReducer 를 구현하는 Java Project

- 구조

- Main.java : Job 을 선언 및 설정   

            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf);

            job.setJobName("JOB_TEST_NAME");

            job.setJarByClass(Main.class);

            job.setMapperClass(TestMapper.class);

            job.setReducerClass(TestReducer.class);

            job.setInputFormatClass(TextInputFormat.class);

            job.setOutputFormatClass(TextOutputFormat.class);

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(IntWritable.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.waitForCompletion(true);   


- Mapper.java : 읽어올 데이터의 타입 지정, keyvalue 형태로 변환

          public class TestMapper extends 

          Mapper<LongWritable, Text, Text, IntWritable> {

          

            private final static IntWritable one = new IntWritable(1);

            private Text word = new Text();


            public void map(LongWritable key, Text value, Context context) 

            throws IOException, InterruptedException {

                StringTokenizer itr = new StringTokenizer(value.toString());

                while (itr.hasMoreTokens()) {

                    word.set(itr.nextToken());

                    context.write(word, one);

                }

            }

        }

          

- Reducer.java : keyvalue 형태의 값을 읽어 가공

          public class TestReducer extends

                Reducer<Text, IntWritable, Text, IntWritable> {

            private IntWritable result = new IntWritable();


            public void reduce(Text key, Iterable<IntWritable> values, Context context)

                    throws IOException, InterruptedException {

                int sum = 0;

                for (IntWritable val : values) {

                    sum += val.get();

                }

                result.set(sum);

                context.write(key, result);

            }

        }

- core-site.xml : hdfs 경로 등 설정 값

- hdfs-site.xml : 퍼미션 등의 설정 값

- 주요 용어

- Context > 객체간 메시지를 전달하는 전역변수 역할

- But Pig Project or Hive Project 가 간단하게 짤 수 있다

'BigData' 카테고리의 다른 글

HDFS ( Hadoop Distributed File System )  (0) 2016.08.01
Hadoop 설치  (0) 2016.08.01
Hadoop 기초  (0) 2016.08.01
Posted by 감각적신사
,