Menu

Description

Minyoung Sung
  1. 아파치 하둡 플랫폼 성능 향상 개요

    • 이 프로젝트는 아파치 하둡 클러스터에서 데이터 읽기 중복에 막는 것이 목표이다. 이 프로젝트의 구현인 HadoopCache의 사용시 더 효율적인 데이터 전송이 이루어짐으로써 주어진 일의 전체적인 수행시간이 개선된다. HadoopCache는 요약하자면, 캐쉬 서버와 캐쉬정보기반 태스크 스케쥴러로 구성되어 있다. 데이터 전송 중복을 줄이기 위하여 캐쉬 서버 모듈이 사용되며, 캐쉬정보 기반 태스크 스케쥴러는 이를 사용하여 응용프로그램의 성능을 높인다.
      HadoopCache는 map worker와 HDFS의 Datanode사이에 위치하여 중복된 데이터의 요청이 발생한 경우 HadoopCache가 갖고 있던 데이터를 map worker에게 전달해 줌으로써 불필요한 데이터의 전송을 막아서 네트워크 트래픽을 줄이고, map worker에게 더욱 효율적으로 데이터를 공급할 수 있게 한다.
  2. 기존 아파치 하둡 hadoop-0.19.1 에서 추가된 점

    • HadoopCache는 Hadoop-0.19.1에서의 Map, Reduce작업 시 데이터 캐싱 모듈 및 해당 캐시를 활용하는 스케줄러 동작을 통한 성능 향상을 목표로 하며, 기본적인 하둡 기능 이외에 다음과 같은 기능을 추가하여 동작한다.

      1. Map worker가 요청한 데이터를 원격에 위치한 파일시스템으로부터 읽어와서 map worker에게 제공하는 기능
      2. 원격에 위치한 파일 시스템으로부터 읽어온 데이터를 로컬 디스크에 저장하여 관리하는 기능
      3. Map worker가 기존에 요청한 데이터를 다시 요청했을 때 해당 데이터가 로컬 디스크에 존재하는 경우 원격에 위치한 파일시스템에 접속하지 않고 map worker에게 데이터를 전송하는 기능
      4. 로컬 디스크에 저장된 데이터의 사이즈가 무한정 커지지 않도록 지정된 사이즈 이하로 제한하는 기능
      5. 로컬 디스크에 저장된 캐시를 인식하여 해당 데이터가 필요한 map task를 해당 노드에 배치시키는 기능
    • alternate text

    • 다음 그림은 HadoopCache의 전체 구조 및 동작 순서를 나타낸다. 회색으로 칠해진 부분은 데이터를 나타내고, 하늘색으로 칠해진 부분은 HadoopCache의 구성요소를 나타낸다. 또한 점선은 제어 메시지와 관련된 흐름이고, 실선은 데이터의 전송과 관련된 흐름을 나타낸다. 전체 동작 순서는 HadoopCache가 map worker가 요청한 데이터를 갖고 있는 경우와 그렇지 않은 경우로 나뉘게 되며, 먼저 요청한 데이터가 없는 경우의 동작 순서부터 설명하도록 한다. 각 동작 별 설명은 아래와 같다.

      1. Assign map

        • Cache-aware task scheduler는 먼저 map task의 수행에 필요한 데이터를 가진 노드의 map worker에게 작업을 할당하여 원격에 위치한 파일시스템으로부터 데이터를 읽지 않도록 한다. 만약 해당 데이터를 로컬 디스크에 가진 노드가 없는 경우에는 원래의 정책에 따라서 작업을 할당한다.
      2. Request input split

        • Map worker는 원격에 위치한 파일시스템에 데이터를 요청하지 않고, cache-server에 파일을 요청한다.
      3. Get local cache path

        • Cache-server는 원격 파일시스템으로부터 데이터를 읽어오기 전에 데이터를 요청한 map worker에게 로컬 디스크에 저장된 파일의 경로를 반환해준다. 이는 streaming interface를 지원함으로써 최대의 성능을 끌어내기 위함이다. 이와 관련된 내용은 다음 절에서 설명한다. 현재 시점에서 파일은 존재하지만 내용은 텅 빈 상태이다.
      4. Remote read

        • Cache-server는 원격 파일시스템으로부터 map worker가 요청한 데이터를 읽어오기 시작한다.
      5. Write splits on local storage

        • Cache-server는 원격 파일시스템으로부터 읽어온 데이터를 (3)에서 map worker에게 알려준 로컬 디스크 내의 파일에 저장한다.
      6. Local read

        • Map worker는 로컬 디스크에 저장된 데이터를 직접 읽어서 작업을 수행한다. 이 경우 데이터가 모두 쓰여지지 않았어도 원격 파일시스템으로부터 읽어온 만큼에 대해서는 작업을 한다. 이것은 HadoopCache가 streaming interface를 지원하기 때문이며 다음 절에서 설명한다.
      7. Register cache

        • 원격 파일시스템으로부터 데이터를 다 읽어온 후 Cache-server는 Cache-aware task scheduler에게 cache가 생성되었음을 알려주고, 이 정보는 다음 번 map task의 할당에 이용된다.
      8. Local write

        • Map task가 종료되면 결과가 로컬 디스크에 써지며 이 후의 Reduce phase는 기존의 Hadoop의 처리과정을 따른다.
    • 여기서 4, 5, 6은 동시에 수행이 가능한 순서이다.

    • 이번에는 Cache-server가 map task가 필요로 하는 데이터를 가지고 있는 경우의 동작 순서에 대해서 설명하도록 하겠다. 이 경우 1, 2, 3의 순서는 변함이 없고, 4, 5가 생략되고 바로 6으로 넘어가게 된다. 그리고 7의 과정도 생략된 후 8로 바로 넘어간다. 이렇게 같은 데이터에 대해서 요청이 자주 발생하게 되는 경우 네트워크 트래픽을 발생시키지 않고 Cache-server로부터 데이터를 받아서 처리할 수 있기 때문에 전체적인 시스템의 성능이 올라가게 된다.

    • Cache-server 동작 설명

      • Cache-server는 map task가 실행되는 노드마다 상주하며 실행되는 프로그램이며, map task에서 사용되는 데이터를 캐싱하여 관리하는 역할을 한다.

      • Streaming Interface

        • Cache-server는 할당된 데이터를 모두 다 읽지 않은 상황에서도 map worker에 데이터를 공급해줄 수 있도록 streaming interface를 지원한다. 이는 Hadoop에서도 지원되는 사항으로서 데이터를 읽으면서 map task를 수행할 수 있도록 함으로서 전체 수행시간을 줄이는데 매우 큰 기여를 하게 된다.

        • Map task가 데이터를 읽을 때 Cache-server와 map task는 서로 통신을 하지 않도록 설계되었는데 이는 추가적인 스레드의 생성을 피함으로써 노드의 자원을 최대한 아끼기 위함이다. 따라서 현재 Cache-server는 얼마만큼의 데이터를 읽었는지를 캐시 파일을 통해 map task에게 알려주게 된다. 이를 지원하기 위한 캐시 파일의 헤더 구조는 다음과 같다.

        • alternate text

        • 가장 앞의 Header Size는 Data부분을 제외한 헤더의 크기를 나타내고, Offset은 원래 파일에서 현재 캐시가 담고 있는 데이터의 위치가 어디인지를 나타낸다. Size는 현재 캐시 파일의 Data 부분의 사이즈를 나타내며, 전체 캐시 파일의 크기는 “Header Size + Size”가 된다. Valid Size는 Data 부분에 쓰여진 실제 데이터의 크기를 나타내고, File Path는 원격 파일시스템에서의 파일의 경로를 의미한다.

        • Map task는 매번 Valid Size를 체크하여 필요로 하는 데이터가 Valid Size 이전에 존재하면 데이터를 읽고, 그렇지 않은 경우에는 Valid Size가 변경될 때까지 기다리게 된다. 하지만 이 기다림은 거의 발생하지 않는데, 그 이유는 map task가 필요로 할 때만 데이터를 채워 넣는 방식이 아니라 할당된 데이터가 모두 채워질 때까지 최대한 빠른 속도로 데이터를 원격 파일시스템으로부터 읽어오기 때문이다. 이로 인해 기존의 Hadoop이 갖고 있지 않던 data prefetching 효과를 얻을 수 있게 되었다.

        • Valid Size 부분은 cache-server에 의해서 매우 자주 업데이트되는 부분이고, map task도 현재 읽을 수 있는 데이터의 크기를 알아보기 위하여 매우 자주 읽혀지는 부분이기 때문에 헤더는 memory-mapped buffer를 이용하여 접근함으로써 빠른 업데이트와 읽기를 가능하게 하였다. Memory-mapped buffer는 파일의 특정부분은 메모리에 올려서 해당 영역에 대한 파일 I/O 요청을 메모리에서 일어나게 하여 disk I/O의 발생을 최소화하는 기법이다. 따라서 로컬 디스크에 파일을 작성하며 발생하는 추가적인 disk I/O는 앞서 설명한 prefetching 효과와 memory-mapped buffer에 의하여 충분히 상쇄되며, 추가적인 부담 없이 캐시 데이터를 생성 및 읽을 수 있게 된다.

      • Cache Coherency

        • 파일시스템에 캐시를 설계할 때 가장 많은 신경을 써야 하는 부분은 바로 cache coherency를 보장하는 방법에 관한 것이다. 캐시에 담긴 데이터가 실제 파일시스템에 담겨진 데이터와 내용이 다르다면 문제가 발생할 수 있기 때문이다. HadoopCache는 Hadoop MapReduce 프레임워크에서 사용되는 파일시스템의 특징을 이용하여 이 문제를 해결하였다. MapReduce 프레임워크에서 사용되는 데이터는 데이터의 중간 부분은 수정되지 않으며, 파일의 마지막 부분에 추가만 가능한 형태가 대부분이다. Hadoop File System (HDFS)나 Google File System (GFS) 모두 이와 같은 특징을 갖는다. HadoopCache는 이 특징을 이용하여 cache coherency를 보장한다.

        • 일단 파일의 중간은 수정되지 않기 때문에 캐시가 원본 파일의 마지막 부분이 아니라면 더 이상 cache coherency를 신경 쓰지 않아도 된다. 관리가 필요한 부분은 마지막 부분인데, 데이터가 수정이 되었다면 파일의 사이즈가 늘어나게 된다. 이 경우 map task가 캐시에 저장된 데이터보다 더 많은 데이터를 요구하게 되고, cache-server는 현재 갖고 있는 데이터로는 map task가 요청한 데이터를 공급해줄 수 없기 때문에 데이터를 새로 읽어오게 된다. 그리고 새로 읽어온 데이터를 이용하여 새로운 캐시를 만들게 된다. 따라서 이 경우에도 수정된 데이터를 map task에게 공급이 가능하게 되고, cache coherency 문제는 발생하지 않게 된다.

        • 하지만 파일이 지워진 후 똑같은 이름으로 파일이 새로 만들어진 경우에는 위와 같은 방법으로는 처리가 불가능하다. 이를 위하여 cache-server는 캐시를 만들 때 해당 파일의 생성날짜를 함께 저장하고 있다가 map task가 데이터를 요청할 때마다 원격 파일시스템에 저장된 파일의 생성날짜와 비교하여 값이 다른 경우에는 파일이 지워졌다고 판단하고 새로 캐시를 만들게 된다. 이 경우 파일의 생성날짜를 비교하는 추가적인 작업이 필요하게 되지만 파일의 생성날짜를 원격 파일시스템으로부터 읽어오는 것은 매우 가벼운 연산이기 때문에 전체 수행시간에 거의 영향을 미치지 않는다.

        • HadoopCache는 Hadoop MapReduce 프레임워크에서 사용되는 파일시스템에서 cache coherency에 영향을 줄 수 있는 모든 경우에 위와 같은 방법으로 대처하여 map task에게 항상 최신의 데이터를 공급할 수 있다.

      • 캐시 사이즈 조정

        • 디스크는 메모리보다 훨씬 큰 공간을 제공하지만 최근의 클라우드 컴퓨팅 서비스의 인스턴스들은 기본적으로 많은 디스크 공간을 제공하지 않는다. 또한 디스크 공간에 대해서도 요금을 받고 있기 때문에 디스크 공간도 효율적으로 사용하여야 할 필요성이 있다. 따라서 cache-server는 전체 캐시 사이즈를 조절할 수 있는 기능을 갖고 있다.

        • 캐시 사이즈를 제한하게 되면 필연적으로 따라오는 것이 cache replacement algorithm인데 HadoopCache는 Least Recently Used (LRU)와 Least Frequently Used (LFU) 두 가지 알고리즘을 갖고 있으며 사용자가 선택할 수 있게 하였다. 하지만 LRU를 쓰던 LFU를 쓰던 실제 이득은 거의 차이가 없었다. 이는 노드에 존재하는 캐시에 맞는 작업을 할당하기 때문에 고전적인 의미의 파일시스템 캐시와는 동작 방식이 전혀 다르기 때문이다.

      • 캐시 저장 공간의 선택

        • Cache-server는 캐시를 메모리가 아닌 디스크에 저장을 한다. 디스크에 캐시를 저장하게 되면 추가적인 disk I/O를 발생시키기 때문에 시스템에 부하를 일으키게 된다. 메모리를 이와 같은 부하가 훨씬 적기 때문에 성능만을 따진다면 디스크가 아닌 메모리를 선택하여나 하나 메모리는 공간의 제약이 심하기 때문에 갖고 있을 수 있는 캐시 사이즈가 매우 작아지게 된다. 이는 캐시 활용도 (cache utilization)을 떨어뜨려 HadoopCache의 이득을 제한하게 된다. 또한 Java에서는 메모리를 공유할 수 있는 방법이 없기 때문에 메모리에 있는 데이터를 map task에게 보내주려면 데이터를 보내기 위한 추가적인 프로그램이 필요하게 되고 이는 추가적인 리소스의 소비를 의미하게 된다. 따라서 HadoopCache는 메모리 대신 디스크에 데이터를 저장하도록 설계되었다. 실제 메모리에 구현을 해본 결과 디스크에 저장할 때보다 낮은 성능을 보여줌을 확인하였다.
      • 주변 cache-server로부터의 데이터 전송

        • Map task가 요청한 데이터가 주변 노드의 cache-server가 관리하는 데이터인 경우, cache-server는 원격 파일시스템에 접속하여 데이터를 가져오지 않고, 주변 cache-server에서 읽어오게 된다. 이를 통해 네트워크 트래픽과 원격 파일시스템의 부하를 최소한으로 유지하여 전체 시스템의 성능을 높일 수 있게 된다. 주변에 위치한다는 의미는 사용자가 공급한 네트워크 토폴로지에 의하여 판단될 수도 있고, ip가 같은 sub network에 속해있다는 것을 의미할 수도 있다.

        *이 기능은 cache-aware task scheduler의 도움을 받아서 수행되게 되는데, cache-server는 원격 파일시스템으로부터 파일을 가져오기 전에 주변의 cache-server가 요청한 데이터를 갖고 있는지를 cache-aware task scheduler에게 물어본다. 이 질의는 cache-aware task scheduler에게 거의 부담을 주지 않고, 오고 가는 데이터의 양도 매우 적기 때문에 cache-server의 수행시간에도 큰 영향을 미치지 않는다. 이렇게 주변 cache-server에서 데이터를 읽어오는 경우에도 위에서 설명한 streaming interface와 cache coherency는 여전히 만족한다.

    • Cache-aware Task Scheduler 동작 설명

      • Cache-aware task scheduler는 HadoopCache를 일반적인 파일시스템 캐시와 구분 짓게 하는 핵심 요소이다. 기존의 고전적인 캐시는 프로그램이 실행되는 곳에 캐시가 우연히 있으면 사용하고 그렇지 않으면 사용하지 못하는 구조지만, HadoopCache는 캐시가 있는 곳으로 프로그램이 옮겨가서 실행되는 구조이기 때문에 고전적인 캐시보다 캐시 활용도가 월등히 높고, 전체적인 성능도 뛰어나게 된다.
      • Cache-aware task scheduler에서 가장 까다로운 부분은 요청한 데이터를 담고 있는 노드를 판별하는 일이다. 이는 단순히 파일명만 비교해서는 될 수 없고 데이터의 위치도 함께 비교해야 한다. 이를 위하여 cache-aware task scheduler는 interval tree를 이용하여 이를 처리하고 있다.
  3. 성능 평가를 위한 실험 구성환경

    • TPC-H를 Hadoop MapReduce 버전으로 작성하여 성능을 평가하였다. 성능평가에 사용된 데이터 사이즈는 10GB와 20GB이고 TPC에서 제공하는 dbgen을 이용하여 생성하였다.

    • 실험 환경

      1. 실험에 이용된 노드는 총 9개 노드이고, 8대는 Intel Core2Quad Q9550 2.83Ghz 쿼드 코어 CPU와 8GB 메모리를 탑재하고 있고, 한 대는 AMD Opteron 265 2.0Ghz 듀얼 코어 CPU 두 개와 8GB메모리를 탑재하고 있다. 네트웍은 1Gigabit 스위치 한 대에 모든 노드가 연결되어 있다.
      2. Hadoop 세팅은 Intel 쿼드코어 노드에서 map task 3개, reduce task 1개를 수행하도록 세팅하였으며, Opteron 노드에는 map, reduce task없이 HDFS만 수행되었다.
    • HadoopCache를 사용하기 위해서는 설정 파일의 설정 및 사용자 코드의 약간의 수정이 필요하다. 설정 및 수정사항은 다음과 같다.

    • 설정 파일

      • hadoop-site.xml 에 아래와 같은 내용을 설정가능하다.

        1. mapred.jobtracker.taskScheduler

          • 사용할 스케쥴러를 선택하는 부분으로서 아래와 같이 설정하여야만 HadoopCache를 사용할 수 있다.

          • <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.apache.hadoop.mapred.HadoopCacheAwareTaskScheduler</value> </property>

        2. mapred.hadoopcache.port

          • Cache-server인 HadoopCache.class가 수행될 포트로써 필수로 지정하여야 한다.

          • <property> <name>mapred.hadoopcache.port</name> <value>59550</value> </property>

        3. mapred.hadoopcache.size.mb

          • 캐시 저장에 할당된 디스크의 크기로서 지정하지 않으면 무한대의 크기를 갖게 된다. 단위는 MB이다.

          • <property> <name>mapred.hadoopcache.size.mb</name> <value>4096</value> </property>

        4. mapred.hadoopcache.dir

          • 캐시가 저장될 디렉토리를 지정하는 환경 변수로서 필수로 지정하여야 한다.

          • <property> <name>mapred.hadoopcache.dir</name> <value>/home/mrgrid/hcfs</value> </property>

        5. mapred.hadoopcache.policy

          • 캐시 replacement algorithm을 선택하는 환경변수로서 LRU가 기본이다.

          • <property> <name>mapred.hadoopcache.policy</name> <value>LFU</value> </property>

    • 사용자 코드에서의 설정

      • 사용자 코드에서는 개별 작업 단위로 설정이 이루어진다. 해당 작업에서 사용되는 파일이 다른 작업에서도 자주 사용되는 파일인 경우에만 설정할 수 있도록 함으로서 사용자가 프로그램 실행의 최적화를 할 수 있도록 하였다.

      • JobConf.setInputFormat(HadoopCacheInputFormat.class);

      • 위와 같이 코드에 추가하면 HadoopCache를 사용할 수 있게 된다. 아래는 HadoopCache를 사용하는 예제이다. 굵은 글씨로 써놓은 부분처럼 HadoopCache의 사용을 원하는 작업에 대해서만 설정을 해주는 것만으로 모든 설정이 완료된다.

      • ```
        public static void main(String args[]) throws IOException
        {
        Conf.parse(args);
        String input_dir = Conf.prefix_input+"/lineitem.tbl";
        String output_dir = "tpch_1";

Path input_path = new Path(input_dir);
Path output_path = new Path(output_dir);
JobConf conf = new JobConf(TPCH_1.class);

FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_path))fs.delete(output_path);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(HadoopCacheInputFormat.class);
conf.setMapperClass(MAP_TPCH_1.class);
conf.setReducerClass(RED_TPCH_1.class);

FileInputFormat.setInputPaths(conf, input_path);
FileOutputFormat.setOutputPath(conf, output_path);

if(Conf.conf_map) conf.setNumMapTasks(Conf.mapper);
conf.setNumReduceTasks(1);

long starttime = System.currentTimeMillis();
JobClient.runJob(conf);
long endtime = System.currentTimeMillis();
System.out.println("Runtime is "+((endtime-starttime)/1000)+"sec");

return;
}

    ```

MongoDB Logo MongoDB