아파치 하둡 플랫폼 성능 향상 개요
기존 아파치 하둡 hadoop-0.19.1 에서 추가된 점
HadoopCache는 Hadoop-0.19.1에서의 Map, Reduce작업 시 데이터 캐싱 모듈 및 해당 캐시를 활용하는 스케줄러 동작을 통한 성능 향상을 목표로 하며, 기본적인 하둡 기능 이외에 다음과 같은 기능을 추가하여 동작한다.

다음 그림은 HadoopCache의 전체 구조 및 동작 순서를 나타낸다. 회색으로 칠해진 부분은 데이터를 나타내고, 하늘색으로 칠해진 부분은 HadoopCache의 구성요소를 나타낸다. 또한 점선은 제어 메시지와 관련된 흐름이고, 실선은 데이터의 전송과 관련된 흐름을 나타낸다. 전체 동작 순서는 HadoopCache가 map worker가 요청한 데이터를 갖고 있는 경우와 그렇지 않은 경우로 나뉘게 되며, 먼저 요청한 데이터가 없는 경우의 동작 순서부터 설명하도록 한다. 각 동작 별 설명은 아래와 같다.
Assign map
Request input split
Get local cache path
Remote read
Write splits on local storage
Local read
Register cache
Local write
여기서 4, 5, 6은 동시에 수행이 가능한 순서이다.
이번에는 Cache-server가 map task가 필요로 하는 데이터를 가지고 있는 경우의 동작 순서에 대해서 설명하도록 하겠다. 이 경우 1, 2, 3의 순서는 변함이 없고, 4, 5가 생략되고 바로 6으로 넘어가게 된다. 그리고 7의 과정도 생략된 후 8로 바로 넘어간다. 이렇게 같은 데이터에 대해서 요청이 자주 발생하게 되는 경우 네트워크 트래픽을 발생시키지 않고 Cache-server로부터 데이터를 받아서 처리할 수 있기 때문에 전체적인 시스템의 성능이 올라가게 된다.
Cache-server 동작 설명
Cache-server는 map task가 실행되는 노드마다 상주하며 실행되는 프로그램이며, map task에서 사용되는 데이터를 캐싱하여 관리하는 역할을 한다.
Streaming Interface
Cache-server는 할당된 데이터를 모두 다 읽지 않은 상황에서도 map worker에 데이터를 공급해줄 수 있도록 streaming interface를 지원한다. 이는 Hadoop에서도 지원되는 사항으로서 데이터를 읽으면서 map task를 수행할 수 있도록 함으로서 전체 수행시간을 줄이는데 매우 큰 기여를 하게 된다.
Map task가 데이터를 읽을 때 Cache-server와 map task는 서로 통신을 하지 않도록 설계되었는데 이는 추가적인 스레드의 생성을 피함으로써 노드의 자원을 최대한 아끼기 위함이다. 따라서 현재 Cache-server는 얼마만큼의 데이터를 읽었는지를 캐시 파일을 통해 map task에게 알려주게 된다. 이를 지원하기 위한 캐시 파일의 헤더 구조는 다음과 같다.

가장 앞의 Header Size는 Data부분을 제외한 헤더의 크기를 나타내고, Offset은 원래 파일에서 현재 캐시가 담고 있는 데이터의 위치가 어디인지를 나타낸다. Size는 현재 캐시 파일의 Data 부분의 사이즈를 나타내며, 전체 캐시 파일의 크기는 “Header Size + Size”가 된다. Valid Size는 Data 부분에 쓰여진 실제 데이터의 크기를 나타내고, File Path는 원격 파일시스템에서의 파일의 경로를 의미한다.
Map task는 매번 Valid Size를 체크하여 필요로 하는 데이터가 Valid Size 이전에 존재하면 데이터를 읽고, 그렇지 않은 경우에는 Valid Size가 변경될 때까지 기다리게 된다. 하지만 이 기다림은 거의 발생하지 않는데, 그 이유는 map task가 필요로 할 때만 데이터를 채워 넣는 방식이 아니라 할당된 데이터가 모두 채워질 때까지 최대한 빠른 속도로 데이터를 원격 파일시스템으로부터 읽어오기 때문이다. 이로 인해 기존의 Hadoop이 갖고 있지 않던 data prefetching 효과를 얻을 수 있게 되었다.
Valid Size 부분은 cache-server에 의해서 매우 자주 업데이트되는 부분이고, map task도 현재 읽을 수 있는 데이터의 크기를 알아보기 위하여 매우 자주 읽혀지는 부분이기 때문에 헤더는 memory-mapped buffer를 이용하여 접근함으로써 빠른 업데이트와 읽기를 가능하게 하였다. Memory-mapped buffer는 파일의 특정부분은 메모리에 올려서 해당 영역에 대한 파일 I/O 요청을 메모리에서 일어나게 하여 disk I/O의 발생을 최소화하는 기법이다. 따라서 로컬 디스크에 파일을 작성하며 발생하는 추가적인 disk I/O는 앞서 설명한 prefetching 효과와 memory-mapped buffer에 의하여 충분히 상쇄되며, 추가적인 부담 없이 캐시 데이터를 생성 및 읽을 수 있게 된다.
Cache Coherency
파일시스템에 캐시를 설계할 때 가장 많은 신경을 써야 하는 부분은 바로 cache coherency를 보장하는 방법에 관한 것이다. 캐시에 담긴 데이터가 실제 파일시스템에 담겨진 데이터와 내용이 다르다면 문제가 발생할 수 있기 때문이다. HadoopCache는 Hadoop MapReduce 프레임워크에서 사용되는 파일시스템의 특징을 이용하여 이 문제를 해결하였다. MapReduce 프레임워크에서 사용되는 데이터는 데이터의 중간 부분은 수정되지 않으며, 파일의 마지막 부분에 추가만 가능한 형태가 대부분이다. Hadoop File System (HDFS)나 Google File System (GFS) 모두 이와 같은 특징을 갖는다. HadoopCache는 이 특징을 이용하여 cache coherency를 보장한다.
일단 파일의 중간은 수정되지 않기 때문에 캐시가 원본 파일의 마지막 부분이 아니라면 더 이상 cache coherency를 신경 쓰지 않아도 된다. 관리가 필요한 부분은 마지막 부분인데, 데이터가 수정이 되었다면 파일의 사이즈가 늘어나게 된다. 이 경우 map task가 캐시에 저장된 데이터보다 더 많은 데이터를 요구하게 되고, cache-server는 현재 갖고 있는 데이터로는 map task가 요청한 데이터를 공급해줄 수 없기 때문에 데이터를 새로 읽어오게 된다. 그리고 새로 읽어온 데이터를 이용하여 새로운 캐시를 만들게 된다. 따라서 이 경우에도 수정된 데이터를 map task에게 공급이 가능하게 되고, cache coherency 문제는 발생하지 않게 된다.
하지만 파일이 지워진 후 똑같은 이름으로 파일이 새로 만들어진 경우에는 위와 같은 방법으로는 처리가 불가능하다. 이를 위하여 cache-server는 캐시를 만들 때 해당 파일의 생성날짜를 함께 저장하고 있다가 map task가 데이터를 요청할 때마다 원격 파일시스템에 저장된 파일의 생성날짜와 비교하여 값이 다른 경우에는 파일이 지워졌다고 판단하고 새로 캐시를 만들게 된다. 이 경우 파일의 생성날짜를 비교하는 추가적인 작업이 필요하게 되지만 파일의 생성날짜를 원격 파일시스템으로부터 읽어오는 것은 매우 가벼운 연산이기 때문에 전체 수행시간에 거의 영향을 미치지 않는다.
HadoopCache는 Hadoop MapReduce 프레임워크에서 사용되는 파일시스템에서 cache coherency에 영향을 줄 수 있는 모든 경우에 위와 같은 방법으로 대처하여 map task에게 항상 최신의 데이터를 공급할 수 있다.
캐시 사이즈 조정
디스크는 메모리보다 훨씬 큰 공간을 제공하지만 최근의 클라우드 컴퓨팅 서비스의 인스턴스들은 기본적으로 많은 디스크 공간을 제공하지 않는다. 또한 디스크 공간에 대해서도 요금을 받고 있기 때문에 디스크 공간도 효율적으로 사용하여야 할 필요성이 있다. 따라서 cache-server는 전체 캐시 사이즈를 조절할 수 있는 기능을 갖고 있다.
캐시 사이즈를 제한하게 되면 필연적으로 따라오는 것이 cache replacement algorithm인데 HadoopCache는 Least Recently Used (LRU)와 Least Frequently Used (LFU) 두 가지 알고리즘을 갖고 있으며 사용자가 선택할 수 있게 하였다. 하지만 LRU를 쓰던 LFU를 쓰던 실제 이득은 거의 차이가 없었다. 이는 노드에 존재하는 캐시에 맞는 작업을 할당하기 때문에 고전적인 의미의 파일시스템 캐시와는 동작 방식이 전혀 다르기 때문이다.
캐시 저장 공간의 선택
주변 cache-server로부터의 데이터 전송
*이 기능은 cache-aware task scheduler의 도움을 받아서 수행되게 되는데, cache-server는 원격 파일시스템으로부터 파일을 가져오기 전에 주변의 cache-server가 요청한 데이터를 갖고 있는지를 cache-aware task scheduler에게 물어본다. 이 질의는 cache-aware task scheduler에게 거의 부담을 주지 않고, 오고 가는 데이터의 양도 매우 적기 때문에 cache-server의 수행시간에도 큰 영향을 미치지 않는다. 이렇게 주변 cache-server에서 데이터를 읽어오는 경우에도 위에서 설명한 streaming interface와 cache coherency는 여전히 만족한다.
Cache-aware Task Scheduler 동작 설명
성능 평가를 위한 실험 구성환경
TPC-H를 Hadoop MapReduce 버전으로 작성하여 성능을 평가하였다. 성능평가에 사용된 데이터 사이즈는 10GB와 20GB이고 TPC에서 제공하는 dbgen을 이용하여 생성하였다.
실험 환경
HadoopCache를 사용하기 위해서는 설정 파일의 설정 및 사용자 코드의 약간의 수정이 필요하다. 설정 및 수정사항은 다음과 같다.
설정 파일
hadoop-site.xml 에 아래와 같은 내용을 설정가능하다.
mapred.jobtracker.taskScheduler
사용할 스케쥴러를 선택하는 부분으로서 아래와 같이 설정하여야만 HadoopCache를 사용할 수 있다.
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.HadoopCacheAwareTaskScheduler</value>
</property>
mapred.hadoopcache.port
Cache-server인 HadoopCache.class가 수행될 포트로써 필수로 지정하여야 한다.
<property>
<name>mapred.hadoopcache.port</name>
<value>59550</value>
</property>
mapred.hadoopcache.size.mb
캐시 저장에 할당된 디스크의 크기로서 지정하지 않으면 무한대의 크기를 갖게 된다. 단위는 MB이다.
<property>
<name>mapred.hadoopcache.size.mb</name>
<value>4096</value>
</property>
mapred.hadoopcache.dir
캐시가 저장될 디렉토리를 지정하는 환경 변수로서 필수로 지정하여야 한다.
<property>
<name>mapred.hadoopcache.dir</name>
<value>/home/mrgrid/hcfs</value>
</property>
mapred.hadoopcache.policy
캐시 replacement algorithm을 선택하는 환경변수로서 LRU가 기본이다.
<property>
<name>mapred.hadoopcache.policy</name>
<value>LFU</value>
</property>
사용자 코드에서의 설정
사용자 코드에서는 개별 작업 단위로 설정이 이루어진다. 해당 작업에서 사용되는 파일이 다른 작업에서도 자주 사용되는 파일인 경우에만 설정할 수 있도록 함으로서 사용자가 프로그램 실행의 최적화를 할 수 있도록 하였다.
JobConf.setInputFormat(HadoopCacheInputFormat.class);
위와 같이 코드에 추가하면 HadoopCache를 사용할 수 있게 된다. 아래는 HadoopCache를 사용하는 예제이다. 굵은 글씨로 써놓은 부분처럼 HadoopCache의 사용을 원하는 작업에 대해서만 설정을 해주는 것만으로 모든 설정이 완료된다.
```
public static void main(String args[]) throws IOException
{
Conf.parse(args);
String input_dir = Conf.prefix_input+"/lineitem.tbl";
String output_dir = "tpch_1";
Path input_path = new Path(input_dir);
Path output_path = new Path(output_dir);
JobConf conf = new JobConf(TPCH_1.class);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_path))fs.delete(output_path);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(HadoopCacheInputFormat.class);
conf.setMapperClass(MAP_TPCH_1.class);
conf.setReducerClass(RED_TPCH_1.class);
FileInputFormat.setInputPaths(conf, input_path);
FileOutputFormat.setOutputPath(conf, output_path);
if(Conf.conf_map) conf.setNumMapTasks(Conf.mapper);
conf.setNumReduceTasks(1);
long starttime = System.currentTimeMillis();
JobClient.runJob(conf);
long endtime = System.currentTimeMillis();
System.out.println("Runtime is "+((endtime-starttime)/1000)+"sec");
return;
}
```