Learn how easy it is to sync an existing GitHub or Google Code repo to a SourceForge project! See Demo

Close

Hadoop-on-Cygwin

FKorning
Attachments

Hadoop on Cygwin

This tutorial shows how to get the latest Apache Hadoop to run on Windows over Cygwin.

It involves convoluted yak-shaving, including customizing your cygwin environment,
fixing shell scripts, patching contributed libraries, modifying core hadoop java code,
and even adding a java.io.File wrapper to the code.

I've managed to get this working to the point where jobs are dispatched, tasks executed, and results compiled.
Apart from TaskTracker, We may still need to get some of the the servlets to understand cygwin symlinks.

Hadoop 1.0.1 on Cygwin 1.7, JDK 1.7 x64, Windows 7

Screenshot

the following screenshot shows the Wordcount MapReduce task completed on win-hadoop.

screenshot

Main Issues

One major problem is a confusion between the hadoop shell scripts
that expect unix paths like "/tmp", and the hadoop java binaries who
interpret this path as "C:\tmp" and not "C:\cygwin\tmp".

There are also script assumptions of standard unix/posix binaries that
are not located in the default path under cygwin, ex "C:\cygwin\bin".

A major issue is that java.io.File does not understand Cygwin symlinks.
Unfortunately, MapRed Task Attempt Logs are now created with symlinks.

Required Software

First you need to install cygwin in "c:\cygwin" with ssh/ssl and tcp_wrappers.

You also need a working java jdk, ant, ivy 2, preferably with maven 2 as well.

All of these should be installed in a logical public working directory (c:\work).

Next install hadoop-1.0.1 from source in a logical public working directory.

Installing Cygwin

Install cygwin in the defautl location, including ssh/ssl packages.

cyg_server SSHD

Under windows run the cygwin bash shell as NT Administrator.

Then use "ssh-host-config" to configure your Open SSH daemon (sshd).
It should run as a Windows service as the "cyg_server" privileged user.

From the cygwin shell, you then have to edit the /etc/passwd file
and give "cyg_server" a valid shell and user home and change the
passwd for the user.

/etc/passwd:


Admin@Fenris ~ $

grep cyg_server /etc/passwd
cyg_server:unused:1012:513:cyg_server,U-FENRIS\cyg_server,S-1-5-21-3016684299-1332526942-2884232613-1012:/home/cyg_server:/bin/bash

Create the "/home/cyg_server" user home dir and its ".ssh" control dir.
Use "chown" and "chmod" to set the appropriate permissions of dirs.

In that dir use "ssh-genkeys" to generate the "cyg_server" user's keys
and copy the id_rsa.pub public public key into authorized_keys.

Use "chown" and "chmod" to set the appropriate permissions of files.

If done right you should be able to ssh cyg_server@localhost.

root Group

After setting up the "cyg_server" technical user, the first cygwin patch
is to fake a "root" group that is a copy of the NT Administrators group.

cygwin groups actually defer to NT authentication, which it resolves
by matching the NT group ID stored in the /etc/group password field.

Edit the /etc/group file to add the fake "root" group, with the
same passwd as "Administrators", but with gid "0" instead of "544".

Note below how "root" has the same passwd as "Administrators" ("S-1-5-32-544"):

/etc/group:


Admin@Fenris ~ $

grep 544 /etc/group

root:S-1-5-32-544:0:
Administrators:S-1-5-32-544:544:

The unix "groups" command shows that "cyg_server" is now a member of the "root" group.


Admin@Fenris ~ $

groups cyg_server

cyg_server : None root Users

/cygwin Path

The first path patch is to make cygwin paths identical to windows paths.

I get around to this by creating a circular symlink in "/cygwin" -> "/".

To avoid confusion with "C:" drive mappings, my paths are drive-logical.

This means that windows "\cygwin\tmp" equals cygwin's "/cygwin/tmp".


Admin@Fenris ~ $

ln -s / /cygwin

Installing Software

The spaces in the default windows program installtion paths "C:\Program Files" cause headaches
which are best avoided if we use the same strategy as the "/cygwin" logical directory for our work.

create a logical public working directory in "c:\work", make sure all developers have read-write
access to it, and create the Cygwin symlink "/work" -> "/cygdrive/c/work" -> "c:\work".

Installing Java

Install the latest jdk and set the appropriate JAVA_HOME system environment variable:


<?properties type="windows.system.variables" ?>

JAVA_HOME=c:\work\java\jdk1.7.0_03_x64
CLASSPATH=c:\work\java\jdk1.7.0_03_x64\lib\tools.jar
PATH=c:\work\java\jdk1.7.0_03_x64;%PATH%

Installing Ant

Install the latest ant and set the appropriate ANT_HOME system environment variables:


<?properties type="windows.system.variables" ?>

ANT_HOME=\work\ant\ant-1.8.2
PATH=\work\ant\ant-1.8.2\bin;%PATH%

Installing Maven

Install the latest maven and set the appropriate MAVEN_HOME system environment variables:


<?properties type="windows.system.variables" ?>

M2_HOME=\work\maven\maven-3.0.4
MVN_HOME=\work\maven\maven-3.0.4
PATH=\work\maven\maven-3.0.4\bin;%PATH%

Configure your local maven repository in \work\maven\repository:


<?properties type="windows.system.variables" ?>

MVN_REPO=\work\maven\repository

Configure your %USERPROFILE%.m2\Settings.xml to access public depots and the local repo:

%USERPROFILE%.m2\Settings.xml:


<?xml version="1.0"?>

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                              http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <localRepository>/work/maven/repository</localRepository>

    <interactiveMode>true</interactiveMode>
    <offline>false</offline>

    <usePluginRegistry>false</usePluginRegistry>
    <!--    
    <pluginGroups>
        <pluginGroup>org.codehaus.mojo</pluginGroup>
        <pluginGroup>org.apache.maven</pluginGroup>
        <pluginGroup>org.apache.maven.plugins</pluginGroup> 
    </pluginGroups>

    <mirrors>
        <mirror>
            <id>uk</id>
            <name>uk</name>
            <url>http://uk.maven.org/maven2</url>
            <mirrorOf>central</mirrorOf>
        </mirror>
    </mirrors>
    -->

    <profiles>
        <profile>

            <repositories>
                <repository>
                    <id>central</id>
                    <name>central</name>
                    <url>http://repo1.maven.apache.org/maven2/</url>
                </repository>               
                <repository>
                    <id>codehaus-repository</id>
                    <name>Codehaus-repository</name>
                    <url>http://repository.codehaus.org</url>
                </repository>
                <repository>
                    <id>codehaus-release</id>
                    <name>codehaus-release</name>
                    <url>http://dist.codehaus.org/</url>
                </repository>
                <repository>
                    <id>codehaus-snapshot</id>
                    <name>Codehaus Snapshot Repo</name>
                    <url>http://snapshots.repository.codehaus.org</url>
                    <snapshots><enabled>true</enabled></snapshots>
                </repository>
                <repository>
                    <id>uk</id>
                    <name>uk</name>
                    <url>http://uk.maven.org/maven2</url>
                </repository>
                <repository>
                    <id>ibiblio</id>
                    <name>ibiblio</name>
                    <url>http://mirrors.ibiblio.org/pub/mirrors/maven2/</url>
                </repository>
                <repository>
                    <id>mvnrepository</id>
                    <name>mvnrepository</name>
                    <url>http://mvnrepository.com/artifact/</url>
                </repository>
                <repository>
                    <id>java.net</id>
                    <name>Maven Java Net Snapshots and Releases</name>
                    <url>https://maven.java.net/content/groups/public/</url>
                </repository>
                <repository>
                    <id>hippo-repository</id>
                    <name>hippo-repository</name>
                    <url>http://maven.onehippo.com/maven2/</url>
                </repository>
                <repository>
                    <id>hippo-snapshot</id>
                    <name>hippo-snapshot</name>
                    <url>http://maven.onehippo.com/maven2-snapshots/</url>
                    <snapshots><enabled>true</enabled></snapshots>
                </repository>
            </repositories>

            <pluginRepositories>
                <pluginRepository>
                    <id>central</id>
                    <name>central</name>
                    <url>http://repo1.maven.apache.org/maven2/</url>
                </pluginRepository>
                <pluginRepository>
                    <id>codehaus-repository</id>
                    <name>Codehaus-repository</name>
                    <url>http://repository.codehaus.org</url>
                </pluginRepository>
                <pluginRepository>
                    <id>codehaus-release</id>
                    <name>codehaus-release</name>
                    <url>http://dist.codehaus.org/</url>
                </pluginRepository>
                <pluginRepository>
                    <id>codehaus-snapshot</id>
                    <name>Codehaus Snapshot Repo</name>
                    <url>http://snapshots.repository.codehaus.org</url>
                    <snapshots><enabled>true</enabled></snapshots>
                </pluginRepository>
            </pluginRepositories>

        </profile>
    </profiles>

</settings>

Installing Ivy

One of my pet peeves is that Ivy and Maven don't cache their jars in quite the same way.

I normally override the default Ivy repository and chache settings to mirror maven's.

This way it makes easier to cache, compile, install, and publish jars between them.

Install the latest ivy and set the appropriate IVY_HOME system environment variables:


<?properties type="windows.system.variables" ?>

IVY_HOME=\work\ivy\ivy-2.2.0

Configure your local ivy repository and cache in \work\ivy\repository:


<?properties type="windows.system.variables" ?>

IVY_REPO=\work\ivy\repository
IVY_REPO_CACHE=\work\ivy\repository\cache
IVY_REPO_LOCAL=\work\ivy\repository\local

Configure your %USERPROFILE%.ivy2\ivyettings.xml to mirror the maven depots and patterns:

%USERPROFILE%.ivy2\ivyettings.xml:


<?xml version="1.0"?>

 <ivysettings>

 <!-- ivysettings.xml for use with maven2

  see    http://draconianoverlord.com/2010/07/18/publishing-to-maven-repos-with-ivy.html  
  see    http://www.jayasoft.org/ivy/doc/configuration

  -->

  <!-- 
   common maven repositories:
      http://repo1.maven.org/maven2/
      http://mirrors.ibiblio.org/pub/mirrors/maven2/
      http://repository.codehaus.org/
      http://dist.codehaus.org/
      http://snapshots.repository.codehaus.org/
      http://people.apache.org/repo/m2-snapshot-repository/
      http://people.apache.org/repo/m2-incubating-repository/
      https://oss.sonatype.org/content/groups/public/
  -->

  <property name="ivy.default.ivy.user.dir" value="/work/ivy"/>

  <!--
   This property is used later in the ivy.xml file to set
   the project's revision. Unless overridden, it defaults
   to the Maven SNAPSHOT convention, as that it works well
   for publishing local builds to the local m2 repository.
  -->
  <property name="revision" value="SNAPSHOT" override="false"/>

  <property name="pattern.ivy2"
    value="[organization]/[module]"/>

  <property name="pattern.ivy2.ivy-revision"
    value="${pattern.ivy2}/ivy-[revision].xml"/>

  <property name="pattern.ivy2.module"
    value="${pattern.ivy2}/[revision]"/>

  <property name="pattern.ivy2.artifact-ivy"
    value="${pattern.ivy2.module}/ivy.xml"/>

  <property name="pattern.ivy2.artifact"
    value="${pattern.ivy2.module}/[artifact]-[revision](-[classifier]).[ext]"/>

  <property name="pattern.maven2"
    value="[organisation]/[module]"/>

  <property name="pattern.maven2.module"
    value="${pattern.maven2}/[revision]"/>

  <property name="pattern.maven2.artifact-pom"
    value="${pattern.maven2.module}/[artifact]-[revision](-[classifier]).pom"/>

  <property name="pattern.maven2.artifact"
    value="${pattern.maven2.module}/[artifact]-[revision](-[classifier]).[ext]"/>

  <property name="repo.central.maven.org"
    value="http://repo1.maven.org/maven2/"/>

  <property name="repo.mirror.ibiblio.org"
    value="http://mirrors.ibiblio.org/pub/mirrors/maven2/"/>

  <property name="repo.plugin.codehaus.org"
    value="http://repository.codehaus.org/"/>

  <property name="repo.snapshot.apache.org"
    value="http://people.apache.org/repo/m2-snapshot-repository/"/>

  <property name="repo.oss.sonatype.org"
    value="https://oss.sonatype.org/content/groups/public/"/>

  <property name="ivy.local.default.root"
    value="/work/ivy/repository/local"/>

  <property name="ivy.local.default.ivy.pattern"
    value="${pattern.ivy2.artifact-ivy}"/>

  <property name="ivy.local.default.artifact.pattern"
    value="${pattern.ivy2.artifact}"/>

  <property name="ivy.shared.default.root"
    value="/work/ivy/repository/shared"/>

  <property name="ivy.shared.default.ivy.pattern"
    value="${pattern.ivy2.artifact-ivy}"/>

  <property name="ivy.shared.default.artifact.pattern"
    value="${pattern.ivy2.artifact}"/>

  <!-- pull in the local repository -->
  <!--  <include url="${ivy.default.conf.dir}/ivyconf-local.xml"/> -->

  <settings defaultResolver="default"/>
  <resolvers>

    <ibiblio
      name="maven-central"
      root="${repo.central.maven.org}"
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />
    <ibiblio
      name="ibiblio-mirror"
      root="${repo.mirror.ibiblio.org}"
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />
    <ibiblio
      name="codehaus-plugin"
      root="${repo.plugin.codehaus.org}"
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />
    <ibiblio
      name="apache-snapshot"
      root="${repo.snapshot.apache.org}"
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />
    <ibiblio
      name="sonatype-oss"
      root="${repo.oss.sonatype.org}"
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />

    <!--
      for *retrieving* artifacts from local m2 repository.
    -->
    <ibiblio
      name="local-m2"
      root="file:///work/maven/repository"      
      m2compatible="true"
      pattern="${pattern.maven2.artifact}"
      changingPattern=".*SNAPSHOT"
      />

    <!--
     for *publishing* artifacts to local m2 repository
    -->
    <filesystem name="local-m2-publish" m2compatible="true">
     <artifact pattern="/work/maven/repository/${pattern.maven2.artifact}"/>
    </filesystem>

    <!-- ivy local -->
    <filesystem name="local" m2compatible="true">
      <ivy pattern="/work/ivy/repository/local/[organization]/[module]/[revision]/ivy.xml"/>
      <artifact pattern="/work/ivy/repository/local/${pattern.ivy2.artifact}"/>
    </filesystem>

    <chain name="default" dual="true">
      <resolver ref="local"/>
      <resolver ref="maven-central"/>
      <resolver ref="ibiblio-mirror"/>
      <resolver ref="sonatype-oss"/>      
      <resolver ref="local-m2"/>
    </chain>
    <chain name="internal">
      <resolver ref="local"/>
      <resolver ref="local-m2"/>
    </chain>
    <chain name="external">
      <resolver ref="maven-central"/>
      <resolver ref="ibiblio-mirror"/>
    </chain>
    <chain name="external-and-snapshots">
      <resolver ref="maven-central"/>
      <resolver ref="ibiblio-mirror"/>
      <resolver ref="codehaus-plugin"/>
      <resolver ref="apache-snapshot"/>
      <resolver ref="sonatype-oss"/>
    </chain>

  </resolvers>

  <caches
    defaultCacheDir="${ivy.default.ivy.user.dir}/repository/cache"
    ivyPattern="[organization]/[module]/ivy-[revision].xml"
    artifactPattern="${pattern.ivy2.artifact}"
    />

</ivysettings>

Finally in a new cygwin bash shell copy the ivy ivy-2.2.0.jar library into the ant lib libraries:


Admin@Fenris ~ $

cp /work/ivy/ivy-2.2.0/ivy-2.2.0.jar /work/ant/ant-1.8.2/lib

cyg_server Shell

From here on all shell scripts, hadoop commands, and starting and stopping
of service daemons is done within an ssh shell as the user "cyg_server".


Admin@Fenris ~ $

ssh cyg_server@localhost

Building Hadoop

Installing Hadoop source

Unzip the win-hadoop source in the local public working directory \work\hadoop\hadoop-1.0.1.


<?properties type="windows.system.variables" ?>

HADOOP_HOME=\work\hadoop\hadoop-1.0.1

Fixing Hadoop Scripts

haddop tmp dir

We will create a hadoop unified tmp dir for the user "cyg_server".
We will later place all hadoop tmp dirs and files in this tmp dir.

hadoop tmp dir:


cyg_server@Fenris ~ $

mkdir -p /cygwin/tmp/hadoop-${USER}

This tmp dir will map to our shell env variables and java config properties.


<?text ?>

shell variable       java property           location
--------------       -------------           ---------
${HADOOP_TMP_DIR}    ${hadoop.tmp.dir}       /cygwin/tmp/hadoop-${USER}

For example, our configuration scripts will use the following paths:


<?text ?>

core pid files        /cygwin/tmp/
core tmp files        /cygwin/tmp/hadoop-${USER}/
core log files        /cygwin/tmp/hadoop-${USER}/logs/

hadoop-env.sh

Next the source hadoop environment shell script need to be modified to support
cygwin paths in hadoop-env.sh, and we need to ensure this script is sourced
by both "hadoop-config.sh" as well as the main "hadoop" sh wrapper script.

hadoop-env.sh:


#!/usr/bin/env bash

# -- cygwin patch --

# The java implementation to use.  Required.
#export JAVA_HOME=/usr/lib/j2sdk1.5-sun
export JAVA_HOME=C:/work/java/jdk1.7.0_03_x64

# Extra Java CLASSPATH elements.  Optional.
#export HADOOP_CLASSPATH=
export CLASSPATH=C:/work/java/jdk1.7.0_03_x64/lib/tools.jar

export TMP=/cygwin/tmp
export TEMP=/cygwin/tmp

# The directory where pid files are stored. /tmp by default.
export HADOOP_PID_DIR=/cygwin/tmp

# Where tmp files are stored.  /tmp by default.
export HADOOP_TMP_DIR=/cygwin/tmp/hadoop-${USER}

# Where log files are stored.  $HADOOP_HOME/logs by default.
export HADOOP_LOG_DIR=/cygwin/tmp/hadoop-${USER}/logs

# -- cygwin patch --

hadoop sh

The 'hadoop' main wrapper needs to be adapted to source the above paths.

It also needs needs fixing with 'cygpath -w'. I provide the whole script below.

hadoop:


#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# The Hadoop command script
#
# Environment Variables
#
#   JAVA_HOME        The java implementation to use.  Overrides JAVA_HOME.
#
#   HADOOP_CLASSPATH Extra Java CLASSPATH entries.
#
#   HADOOP_USER_CLASSPATH_FIRST      When defined, the HADOOP_CLASSPATH is 
#                                    added in the beginning of the global
#                                    classpath. Can be defined, for example,
#                                    by doing 
#                                    export HADOOP_USER_CLASSPATH_FIRST=true
#
#   HADOOP_HEAPSIZE  The maximum amount of heap to use, in MB. 
#                    Default is 1000.
#
#   HADOOP_OPTS      Extra Java runtime options.
#   
#   HADOOP_NAMENODE_OPTS       These options are added to HADOOP_OPTS 
#   HADOOP_CLIENT_OPTS         when the respective command is run.
#   HADOOP_{COMMAND}_OPTS etc  HADOOP_JT_OPTS applies to JobTracker 
#                              for e.g.  HADOOP_CLIENT_OPTS applies to 
#                              more than one command (fs, dfs, fsck, 
#                              dfsadmin etc)  
#
#   HADOOP_CONF_DIR  Alternate conf dir. Default is ${HADOOP_HOME}/conf.
#
#   HADOOP_ROOT_LOGGER The root appender. Default is INFO,console
#

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac

if [ -e "$bin"/../libexec/hadoop-config.sh ]; then
  . "$bin"/../libexec/hadoop-config.sh
else
  . "$bin"/hadoop-config.sh
fi

# if no args specified, show usage
if [ $# = 0 ]; then
  echo "Usage: hadoop [--config confdir] COMMAND"
  echo "where COMMAND is one of:"
  echo "  namenode -format     format the DFS filesystem"
  echo "  secondarynamenode    run the DFS secondary namenode"
  echo "  namenode             run the DFS namenode"
  echo "  datanode             run a DFS datanode"
  echo "  dfsadmin             run a DFS admin client"
  echo "  mradmin              run a Map-Reduce admin client"
  echo "  fsck                 run a DFS filesystem checking utility"
  echo "  fs                   run a generic filesystem user client"
  echo "  balancer             run a cluster balancing utility"
  echo "  fetchdt              fetch a delegation token from the NameNode"
  echo "  jobtracker           run the MapReduce job Tracker node" 
  echo "  pipes                run a Pipes job"
  echo "  tasktracker          run a MapReduce task Tracker node" 
  echo "  historyserver        run job history servers as a standalone daemon"
  echo "  job                  manipulate MapReduce jobs"
  echo "  queue                get information regarding JobQueues" 
  echo "  version              print the version"
  echo "  jar <jar>            run a jar file"
  echo "  distcp <srcurl> <desturl> copy file or directories recursively"
  echo "  archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
  echo "  classpath            prints the class path needed to get the"
  echo "                       Hadoop jar and the required libraries"
  echo "  daemonlog            get/set the log level for each daemon"
  echo " or"
  echo "  CLASSNAME            run the class named CLASSNAME"
  echo "Most commands print help when invoked w/o parameters."
  exit 1
fi

# get arguments
COMMAND=$1
shift

# Determine if we're starting a secure datanode, and if so, redefine appropriate variables
if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
  HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR
  HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
  HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
  starting_secure_dn="true"
fi

if [ "$JAVA_HOME" != "" ]; then
  #echo "JAVA_HOME: $JAVA_HOME"
  JAVA_HOME="$JAVA_HOME"
fi
# some Java parameters
if $cygwin; then
  JAVA_HOME=`cygpath -w "$JAVA_HOME"`
  #echo "cygwin JAVA_HOME: $JAVA_HOME"  
fi
  if [ "$JAVA_HOME" == "" ]; then
  echo "Error: JAVA_HOME is not set: $JAVA_HOME"
  exit 1
fi

JAVA=$JAVA_HOME/bin/java
JAVA_HEAP_MAX=-Xmx1000m

# check envvars which might override default args
if [ "$HADOOP_HEAPSIZE" != "" ]; then
  #echo "run with heapsize $HADOOP_HEAPSIZE"
  JAVA_HEAP_MAX="-Xmx""$HADOOP_HEAPSIZE""m"
  #echo $JAVA_HEAP_MAX
fi

# CLASSPATH initially contains $HADOOP_CONF_DIR
CLASSPATH="${HADOOP_CONF_DIR}"
if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ] && [ "$HADOOP_CLASSPATH" != "" ] ; then
  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
fi
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar

# for developers, add Hadoop classes to CLASSPATH
if [ -d "$HADOOP_HOME/build/classes" ]; then
  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/classes
fi
if [ -d "$HADOOP_HOME/build/webapps" ]; then
  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build
fi
if [ -d "$HADOOP_HOME/build/test/classes" ]; then
  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/test/classes
fi
if [ -d "$HADOOP_HOME/build/tools" ]; then
  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/tools
fi

# so that filenames w/ spaces are handled correctly in loops below
IFS=

# for releases, add core hadoop jar & webapps to CLASSPATH
if [ -e $HADOOP_PREFIX/share/hadoop/hadoop-core-* ]; then
  # binary layout
  if [ -d "$HADOOP_PREFIX/share/hadoop/webapps" ]; then
    CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/share/hadoop
  fi
  for f in $HADOOP_PREFIX/share/hadoop/hadoop-core-*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  # add libs to CLASSPATH
  for f in $HADOOP_PREFIX/share/hadoop/lib/*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  for f in $HADOOP_PREFIX/share/hadoop/lib/jsp-2.1/*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  for f in $HADOOP_PREFIX/share/hadoop/hadoop-tools-*.jar; do
    TOOL_PATH=${TOOL_PATH}:$f;
  done
else
  # tarball layout
  if [ -d "$HADOOP_HOME/webapps" ]; then
    CLASSPATH=${CLASSPATH}:$HADOOP_HOME
  fi
  for f in $HADOOP_HOME/hadoop-core-*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  # add libs to CLASSPATH
  for f in $HADOOP_HOME/lib/*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  if [ -d "$HADOOP_HOME/build/ivy/lib/Hadoop/common" ]; then
    for f in $HADOOP_HOME/build/ivy/lib/Hadoop/common/*.jar; do
      CLASSPATH=${CLASSPATH}:$f;
    done
  fi

  for f in $HADOOP_HOME/lib/jsp-2.1/*.jar; do
    CLASSPATH=${CLASSPATH}:$f;
  done

  for f in $HADOOP_HOME/hadoop-tools-*.jar; do
    TOOL_PATH=${TOOL_PATH}:$f;
  done
  for f in $HADOOP_HOME/build/hadoop-tools-*.jar; do
    TOOL_PATH=${TOOL_PATH}:$f;
  done
fi

# add user-specified CLASSPATH last
if [ "$HADOOP_USER_CLASSPATH_FIRST" = "" ] && [ "$HADOOP_CLASSPATH" != "" ]; then
  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
fi

# default log directory & file
if [ "$HADOOP_LOG_DIR" = "" ]; then
  HADOOP_LOG_DIR="$HADOOP_HOME/logs"
fi
if [ "$HADOOP_LOGFILE" = "" ]; then
  HADOOP_LOGFILE='hadoop.log'
fi

# default policy file for service-level authorization
if [ "$HADOOP_POLICYFILE" = "" ]; then
  HADOOP_POLICYFILE="hadoop-policy.xml"
fi

# restore ordinary behaviour
unset IFS

# figure out which class to run
if [ "$COMMAND" = "classpath" ] ; then
  if $cygwin; then
    CLASSPATH=`cygpath -wp "$CLASSPATH"`
  fi
  echo $CLASSPATH
  exit
elif [ "$COMMAND" = "namenode" ] ; then
  CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
  CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
elif [ "$COMMAND" = "datanode" ] ; then
  CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode'
  if [ "$starting_secure_dn" = "true" ]; then
    HADOOP_OPTS="$HADOOP_OPTS -jvm server $HADOOP_DATANODE_OPTS"
  else
    HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
  fi
elif [ "$COMMAND" = "fs" ] ; then
  CLASS=org.apache.hadoop.fs.FsShell
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "dfs" ] ; then
  CLASS=org.apache.hadoop.fs.FsShell
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "dfsadmin" ] ; then
  CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "mradmin" ] ; then
  CLASS=org.apache.hadoop.mapred.tools.MRAdmin
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "fsck" ] ; then
  CLASS=org.apache.hadoop.hdfs.tools.DFSck
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "balancer" ] ; then
  CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_BALANCER_OPTS"
elif [ "$COMMAND" = "fetchdt" ] ; then
  CLASS=org.apache.hadoop.hdfs.tools.DelegationTokenFetcher
elif [ "$COMMAND" = "jobtracker" ] ; then
  CLASS=org.apache.hadoop.mapred.JobTracker
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOBTRACKER_OPTS"
elif [ "$COMMAND" = "historyserver" ] ; then
  CLASS=org.apache.hadoop.mapred.JobHistoryServer
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOB_HISTORYSERVER_OPTS"
elif [ "$COMMAND" = "tasktracker" ] ; then
  CLASS=org.apache.hadoop.mapred.TaskTracker
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_TASKTRACKER_OPTS"
elif [ "$COMMAND" = "job" ] ; then
  CLASS=org.apache.hadoop.mapred.JobClient
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "queue" ] ; then
  CLASS=org.apache.hadoop.mapred.JobQueueClient
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "pipes" ] ; then
  CLASS=org.apache.hadoop.mapred.pipes.Submitter
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "version" ] ; then
  CLASS=org.apache.hadoop.util.VersionInfo
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "jar" ] ; then
  CLASS=org.apache.hadoop.util.RunJar
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "distcp" ] ; then
  CLASS=org.apache.hadoop.tools.DistCp
  CLASSPATH=${CLASSPATH}:${TOOL_PATH}
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "daemonlog" ] ; then
  CLASS=org.apache.hadoop.log.LogLevel
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "archive" ] ; then
  CLASS=org.apache.hadoop.tools.HadoopArchives
  CLASSPATH=${CLASSPATH}:${TOOL_PATH}
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "sampler" ] ; then
  CLASS=org.apache.hadoop.mapred.lib.InputSampler
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
else
  CLASS=$COMMAND
fi

# cygwin path translation
if $cygwin; then
  JAVA_HOME=`cygpath -w "$JAVA_HOME"`
  CLASSPATH=`cygpath -wp "$CLASSPATH"`
  HADOOP_HOME=`cygpath -w "$HADOOP_HOME"`
  HADOOP_LOG_DIR=`cygpath -w "$HADOOP_LOG_DIR"`
  TOOL_PATH=`cygpath -wp "$TOOL_PATH"`
fi

# setup 'java.library.path' for native-hadoop code if necessary
JAVA_LIBRARY_PATH=''

if [ -d "${HADOOP_HOME}/build/native" -o -d "${HADOOP_HOME}/lib/native" -o -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then
  JAVA_PLATFORM=`${JAVA} -classpath ${CLASSPATH} -Xmx32m ${HADOOP_JAVA_PLATFORM_OPTS} org.apache.hadoop.util.PlatformName | sed -e "s/ /_/g"`
  #echo "JAVA_PLATFORM: $JAVA_PLATFORM"

  if [ "$JAVA_PLATFORM" = "Windows_7-amd64-64" ]; then
    JSVC_ARCH="amd64"
  elif [ "$JAVA_PLATFORM" = "Linux-amd64-64" ]; then
    JSVC_ARCH="amd64"
  else
    JSVC_ARCH="i386"
  fi

  if [ -d "$HADOOP_HOME/build/native" ]; then
    JAVA_LIBRARY_PATH=${HADOOP_HOME}/build/native/${JAVA_PLATFORM}/lib
  fi

  if [ -d "${HADOOP_HOME}/lib/native" ]; then
    if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
      JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}
    else
      JAVA_LIBRARY_PATH=${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}
    fi
  fi

  if [ -e "${HADOOP_PREFIX}/lib/libhadoop.a" ]; then
    JAVA_LIBRARY_PATH=${HADOOP_PREFIX}/lib
  fi
fi

# cygwin path translation
if $cygwin; then
  JAVA_LIBRARY_PATH=`cygpath -wp "$JAVA_LIBRARY_PATH"`
  PATH="/cygwin/bin:/cygwin/usr/bin:`cygpath -p ${PATH}`"
fi

HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.tmp.dir=$HADOOP_TMP_DIR"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,console}"

#turn security logger on the namenode and jobtracker only
if [ $COMMAND = "namenode" ] || [ $COMMAND = "jobtracker" ]; then
  HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,DRFAS}"
else
  HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
fi

if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
  HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
fi  
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.policy.file=$HADOOP_POLICYFILE"

# Check to see if we should start a secure datanode
if [ "$starting_secure_dn" = "true" ]; then
  if [ "$HADOOP_PID_DIR" = "" ]; then
    HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid"
  else
    HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
  fi

  exec "$HADOOP_HOME/libexec/jsvc.${JSVC_ARCH}" -Dproc_$COMMAND -outfile "$HADOOP_LOG_DIR/jsvc.out" \
                                                -errfile "$HADOOP_LOG_DIR/jsvc.err" \
                                                -pidfile "$HADOOP_SECURE_DN_PID" \
                                                -nodetach \
                                                -user "$HADOOP_SECURE_DN_USER" \
                                                -cp "$CLASSPATH" \
                                                $JAVA_HEAP_MAX $HADOOP_OPTS \
                                                org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@"
else
  # run it
  exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi

slaves.sh

The SSH "slaves" invocation wrapper is broken because it fails to
provide the ssh user login, which isn't defaulted to in my openssh.

Add '-l ${USER}' to the "slaves" wrapper ssh invocation:

slaves.sh:


#!/usr/bin/env bash

for slave in `cat "$HOSTLIST"|sed "s/#.*$//;/^$/d"`; do
  ssh -l $USER $HADOOP_SSH_OPTS $slave $"${@// /\\ }" \
  2>&1 | sed "s/^/$slave: /" &
  if [ "$HADOOP_SLAVE_SLEEP" != "" ]; then
    sleep $HADOOP_SLAVE_SLEEP
  fi
done

start-all.sh

Next the Hadoop start-*.sh startup scripts need to be adapted for cygwin.

Specifically, we will track .pid files instead of relying on cygwin ps -w
as it never returns more than 1 arg - This is a bit kludgy with pid collision.
Feel free to adapt the script, for example to test that it's a java process, etc.

We also want to erase the main logs and system.out dumps as well as wipe
and recreate the mapred log directories upon startup.


#!/usr/bin/env bash

# -- cygwin patch --

#pid=`ps aux | grep hadoop | head -1 | awk '{print $1}'`
pid=`cat ${HADOOP_PID_DIR}/hadoop-*.pid | head -1 | awk '{print $1}'`
if  [ "$pid" != "" ]; then
  echo "hadoop already running ($pid). run stop-all.sh to terminate."
  exit 1
fi

# log dir
rm -fr ${HADOOP_LOG_DIR}/hadoop-*

rm -fr ${HADOOP_LOG_DIR}/history
rm -fr ${HADOOP_LOG_DIR}/userlogs
mkdir -p ${HADOOP_LOG_DIR}/history
mkdir -p ${HADOOP_LOG_DIR}/userlogs
chmod a+s ${HADOOP_LOG_DIR}/history
chmod a+s ${HADOOP_LOG_DIR}/userlogs

# tmp dir
rm -fr ${HADOOP_TMP_DIR}/mapred/local/taskTracker
mkdir -p ${HADOOP_TMP_DIR}/mapred/local/taskTracker
chmod a+s  ${HADOOP_TMP_DIR}/mapred/local/taskTracker

# -- cygwin patch --

stop-all.sh

Also adapt the stop-*.sh shutdown scripts to clear the pid files.

stop-all.sh:


#!/usr/bin/env bash

# -- cygwin patch --

rm -f ${HADOOP_PID_DIR}/hadoop-*-datanode.pid
rm -f ${HADOOP_PID_DIR}/hadoop-*-namenode.pid
rm -f ${HADOOP_PID_DIR}/hadoop-*-secondarynamenode.pid

rm -f ${HADOOP_PID_DIR}/hadoop-*-jobtracker.pid
rm -f ${HADOOP_PID_DIR}/hadoop-*-tasktracker.pid

# -- cygwin patch --

Patching Hadoop Code

Next the hadoop FS and Utilities are broken, as they expect shells
with POSIX /bin executables in their path (bash,chmod,chown,chgrp).

For various reasons it's a real bad idea to add "/cygwin/bin" to your
windows path, so we're going to have to fix the utility classes to
be cygwin aware and use the "/cygwin/bin" binaries instead.

We also want to modify the mapred task code to include our custom
LinkedFile.java java.io.File wrapper which understands cygwin symlinks
(see below).

This is why you need the source, because we're going to have to fix
the java code and recompile the hadoop core libraries (and why you
need ant/ivy).

Hadoop Eclipse Plugin

Building the eclipse plugin requires a valid Eclipse IDE installed and
the corresponding $ECLIPSE_HOME environment variable to be defined.

Make sure Eclipse is using same architecture as the default jdk compiler.
(64 bit for me).

Ivy Build Scripts

copy your ivysettings.xml overwriting the one in your hadoop ivy dir.

Next we modify the ivy build.xml and build-contrib scripts in order
to set the correct compiler javac.target=1.7 for the compile targets.

build files to modify:


build.xml
src/contrib/build.xml
src/contrib/buil-contrib.xml

modify all the javac compile targets to include the target property.

build-contrib.xml:


<property name="javac.debug" value="on"/>
<property name="javac.version" value="1.7"/>

<!-- ====================================================== -->
<!-- Compile a Hadoop contrib's files -->
<!-- ====================================================== -->
<target name="compile" depends="init, ivy-retrieve-common" unless="skip.contrib">
    <echo message="contrib: ${name}"/>
    <javac
        encoding="${build.encoding}"
        srcdir="${src.dir}"
        includes="*/.java"
        destdir="${build.classes}"
        target="${javac.version}"
        source="${javac.version}"
        optimize="${javac.optimize}"
        debug="${javac.debug}"
        deprecation="${javac.deprecation}">
        <classpath refid="contrib-classpath"/>
    </javac>
</target>

GridMix.java

Gridmix is currently buggy and breaks compilation as it uses a
generic Enum code that just craps out in jdk/jre 1.7 and above.

The fix is to dumb it down and use untyped Enums.

Gridmix.java:


// -- cygwin patch --

private String getEnumValues(Enum[] e) {
    StringBuilder sb = new StringBuilder();
    String sep = "";
    for (Enum v : e) {
      sb.append(sep);
      sb.append(v.name());
      sep = "|";
    }
    return sb.toString();
}

JvmManager.java

Another bug is that JvmManager.JvmRunner.kill() triggers a nullpointer
exception as it doesn't check for empty strings, which apparently we
get in cygwin when parsing the tasks. Here is the fix:

JvmManager.java:


// -- cygwin patch --

synchronized void kill() throws IOException, InterruptedException {
    if (!killed) {
        TaskController controller = tracker.getTaskController();
        // Check inital context before issuing a kill to prevent situations
        // where kill is issued before task is launched.
        String pidStr = jvmIdToPid.get(jvmId);

        // -- cygwin patch -- ignore empty pidStr
        if ((pidStr != null) && !(pidStr.isEmpty())) {
            String user = env.conf.getUser();
            int pid = Integer.parseInt(pidStr);
            // start a thread that will kill the process dead
            if (sleeptimeBeforeSigkill > 0) {
                new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill, Signal.KILL).start();
                controller.signalTask(user, pid, Signal.TERM);
            } else {
                controller.signalTask(user, pid, Signal.KILL);
            }
        } else {
            LOG.info(String.format("JVM Not killed %s but just removed", jvmId.toString()));
        }
        killed = true;
    }
}

Shell.java

The first real java code fix is to make the hadoop Shell utilities use cygwin paths:

Shell.java:


// -- cygwin patch --

/** Set to true on Windows platforms */
public static final boolean WINDOWS =
    System.getProperty("os.name").startsWith("Windows");

/** a Unix command to get the current user's name */
public final static String USER_NAME_COMMAND = (WINDOWS ? "/cygwin/bin/whoami" : "whoami");

/** a Unix command to get the current user's groups list */
public static String[] getGroupsCommand() {
return new String[]{ (WINDOWS ? "/cygwin/bin/bash" : "bash"), "-c", "groups"};
}

/** a Unix command to get a given user's groups list */
public static String[] getGroupsForUserCommand(final String user) {
//'groups username' command return is non-consistent across different unixes
return new String [] {(WINDOWS ? "/cygwin/bin/bash" : "bash"), "-c", "id -Gn " + user};
}

/** a Unix command to get a given netgroup's user list */
public static String[] getUsersForNetgroupCommand(final String netgroup) {
//'groups username' command return is non-consistent across different unixes
return new String [] {(WINDOWS ? "/cygwin/bin/bash" : "bash"), "-c", "getent netgroup " + netgroup};
}

/** Return a Unix command to get permission information. */
public static String[] getGET_PERMISSION_COMMAND() {
//force /bin/ls, except on windows.
return new String[] {(WINDOWS ? "/cygwin/bin/ls" : "/bin/ls"), "-ld"};
}

/** a Unix command to set permission */
public static final String SET_PERMISSION_COMMAND = (WINDOWS ? "/cygwin/bin/chmod" : "chmod");

/** a Unix command to set owner */
public static final String SET_OWNER_COMMAND = (WINDOWS ? "/cygwin/bin/chown" : "chown");

/** a Unix command to set group */
public static final String SET_GROUP_COMMAND = (WINDOWS ? "/cygwin/bin/chgrp" : "chgrp");

/** a Unix command to get ulimit of a process. */
public static final String ULIMIT_COMMAND = "ulimit";

Path.java

We also modify Path.java to expose as public static the methods
to handle cygwin paths. This includes the old normalizePath()
and hasWindowsDrive() methods, to which we add new localizePath()
and hasCygwinDrive() methods.

Path.java:


// -- cygwin patch --

public static final String CYGDRIVE = "cygdrive";

private URI uri;                   // a hierarchical uri

// utilities

// -- cygwin patch --
public static String localizePath (String path) {
    if (!WINDOWS) return path;

    if (hasCygwinDrive(path)) {
        char drive = path.charAt(CYGDRIVE.length() + 2);
        path = drive + ":" + path.substring(CYGDRIVE.length() + 3); 
    }

    if (path.contains("/")) {
        path = path.replace('/', File.separatorChar);
    }

    return path;
}

// -- cygwin patch --
public static String normalizePath(String path) {
    // remove double slashes & backslashes
    if (path.indexOf("//") != -1) {
      path = path.replace("//", "/");
    }
    if (path.indexOf("\\") != -1) { 
      path = path.replace("\\", "/");
    }

    // trim trailing slash from non-root path (ignoring windows drive)
    int minLength = hasWindowsDrive(path, true) ? 4 : 1;
    if (path.length() > minLength && path.endsWith("/")) {
      path = path.substring(0, path.length()-1);
    }

    return path;
}

// -- cygwin patch --  
public static boolean hasWindowsDrive(String path, boolean slashed) {
    if (!WINDOWS) return false;
    int start = slashed ? 1 : 0;
    return
      path.length() >= start+2 &&
      (slashed ? path.charAt(0) == '/' : true) &&
      path.charAt(start+1) == ':' &&
      ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') ||
       (path.charAt(start) >= 'a' && path.charAt(start) <= 'z'));
}

// -- cygwin patch --
public static boolean hasCygwinDrive(String path) {
    if (!WINDOWS) return false;
    return path.startsWith("/" + CYGDRIVE + "/") ||
        path.startsWith ("\\" + CYGDRIVE + "\\");
}

FileUtil.java

Next we need to revert hadoop filesystem's FileUtil.setPermission()
to circumvent RawLocalFileSystem and use the old shell exec invocation.

The patch is as follows:

(from Dave Latham - 19/Apr/12 18:40)

Some time ago, RawLocalFileSystem.setPermission used to use a shell
exec command to fork a process to alter permissions of a file.

Somewhere along the 0.20 branch (I believe in the 0.20.security branch)
it was decided that this was too slow, and instead java.io.File.set{Readable|Writable|Executable}
should be used (see HADOOP-6304 for one such issue. It has a patch with the logic
that wound up in FileUtil though perhaps committed from a different patch).
However the java.io.File methods don't allow one to directly set all the
permissions so the code first has to clear permissions, then build them up again.

This resulted in two problems.

First, there is a race condition when the file briefly has no permissions
even for the owner (see MAPREDUCE-2238 for more detail).
Second, Windows doesn't support clearing all permissions on the file (this JIRA).

The first problem was worked around in HADOOP-7110 (HADOOP-7432 backported it to this branch)
by using JNI native code instead. However, if the native code is not available, then it falls
back to the java.io.File methods. So, the second problem still remains, that FileUtil.setPermission
(and thus the RawLocalFileSystem setPermission) does not work on Windows because Windows does
not have the native code implementation and also fails the java.io.File fallback.

The issues FKorning ran into in a comment above appear to be wider than this particular JIRA,
though I may have misunderstood what led to his shorn yak.

Windows is listed as a supported platform for Hadoop, and some of our developers use Windows
as a development environment, so it's important for us that hadoop at least functions on Windows,
even if it's not as performant as our production clusters on Linux.
I noted that this is currently the highest voted open JIRA for hadoop.

In order for it to function on Windows, I added a final fallback in FileUtil.setPermission
to revert to the older behavior of using a shell exec fork for setPermission
when the other methods fail. Perhaps it will be helpful for others.

FileUtil.java:


// -- cygwin patch --

public static void setPermission(File f, FsPermission permission) throws IOException {
    FsAction user = permission.getUserAction();
    FsAction group = permission.getGroupAction();
    FsAction other = permission.getOtherAction();

    // use the native/fork if the group/other permissions are different
    // or if the native is available    
    if (group != other || NativeIO.isAvailable()) {
      execSetPermission(f, permission);
      return;
    }

    try
    {
        boolean rv = true;

        // read perms
        rv = f.setReadable(group.implies(FsAction.READ), false);
        checkReturnValue(rv, f, permission);
        if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
          f.setReadable(user.implies(FsAction.READ), true);
          checkReturnValue(rv, f, permission);
        }

        // write perms
        rv = f.setWritable(group.implies(FsAction.WRITE), false);
        checkReturnValue(rv, f, permission);
        if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
          f.setWritable(user.implies(FsAction.WRITE), true);
          checkReturnValue(rv, f, permission);
        }

        // exec perms
        rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
        checkReturnValue(rv, f, permission);
        if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
          f.setExecutable(user.implies(FsAction.EXECUTE), true);
          checkReturnValue(rv, f, permission);
        }
    }
    catch (IOException ioe)
    {
        LOG.warn("Java file permissions failed to set " + f + " to " + permission + " falling back to fork");
        execSetPermission(f, permission);
    }

}

File.separatorChar

Next Native windows paths with the File.separatorChar of "\" must be avoided.
We should be using posix paths and let java.io translate paths to native fs.
There's only 6 or so occurences in the code, which you must replace with "/".

these are the files to be patched:


<?text type="fileset"?>

DefaultTaskController.java
TaskLog.java
UserLogCleaner.java

LinkedFile.java

Now the underlying issue is that Java doesn't understand Cygwin Symlinks.

We solve that problem by wrapping key instances of java.io.File with our
new org.apache.hadoop.fs.LinkedFile wrapper class, which at creation will
resolve cygwin symlinks to their targets.

LinkedFile.java:


// -- cygwin patch --

package org.apache.hadoop.fs;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;

/**
 * 
 * @author      Francis Korning   fkorning@yahoo.ca
 * @date        2012-04-30    
 * 
 * @project     muxbus            muxbus.net
 * @source      sourceforge       https://sourceforge.net/p/muxbus/ 
 *     
 * @copyright   2012              F.Korning
 * @license     LGPL              http://www.gnu.org/copyleft/lesser.html
 * 
 * @credit      Jesse Hager       jessehager@iname.com    (.lnk shortcut file format).
 * 
 * 
 * LinkedFile is a java.io.File wrapper that understands both Cygwin Symbolic Links
 * and Windows Explorer Shorcut Links (.lnk).  If a given File is a Link, it tries
 * to resolve the File by following link paths recursively until a final canonical
 * target is found, and if the file exists will act as a wrapper for the link
 * target. A LinkedFile always tracks its source and provides link reflection.
 *
 * A common frustration for POSIX platform developers is that Java on windows does
 * not understand symbolic Links which aren't handled by the windows io native libs.
 * This means that Cygwin or Interix (aka Winterix or Windows Service for Unix)
 * integrated java applications that depend on POSIX tools may break when it comes
 * to resolution of paths with linked files and directories.
 * 
 * The first workaround strategy, which works where links are predictably created,
 * is to ensure that cygwin paths and windows paths are equivalent and identical.
 * For example, you create a circular "/cygwin" link that maps back to the root "/": 
 * 
 *     /cygwin      ->  /      ->  \cygwin\      ->  C:\cygwin\ 
 *     /cygwin/tmp  ->  /tmp/  ->  \cygwin\tmp\  ->  C:\cygwin\tmp\ 
 * 
 * But you often have no control over much of the integrated codebase and tools.
 * If some of the components create symlinks dynamically where java expects real
 * files and directories this workaround will fail.
 * 
 * Now the only OS-native IO alternative would be to use Windows NTFS Junctions.
 * These are completely transparent and work in shells, in Explorer, and for java.
 * The best implementation would be the junction provided in the SysInternals suite.
 * You could wrap the cygwin 'ln' command and force it to use a junction instead,
 * but this breaks some POSIX compatibility and requires Administrator superuser
 * rights which you may not want to assign to all your running service daemons.
 * 
 * Next Interix symbolic links and the mklink command are built-in since Windows Vista.
 * These fall in between as they act like native symlinks in the shell and in java,
 * look like cygwin links inside the shell, but again they can only be created by the
 * Administrator, and also break POSIX. Unlike cygwin they do appear as shortcuts in
 * a Windows Explorer, with the caveat that these are broken because of permissions.
 * 
 * Note that both strategies only work if the tools create links via the 'ln' command.
 * But what if the offending integrated tools are in perl or python, or even better yet in
 * compiled c/c++ code making use of libc calls for which we have no source nor license?
 * This means that we will need to adopt a strategy to adapt java.io.File to Cygwin libc.
 * Also, most multi-platform java developers favour Cygwin instead of Microsoft's Interix.
 * All of the POSIX daemons we're looking at expect cygwin shells, and this compatibility
 * is our main focus here; we ideally an a model that works without having to wrap 'ln'.
 * 
 * Now Both Cygwin Symlinks and Explorer Shortcuts are just plain binary files underneath
 * with some information pointing to their link targets.  They have the advantage that
 * we can parse these and thus this allows us to add some link reflection to java.io.
 * (This also creates a strange case where windows would have more functionality than
 * unix, unless one would write a unix java native libc wrapper for link functions).
 * They also require no special Administrator rights for creation and maintenance.
 * 
 * A side-effect is that shortcuts created under Windows Explorer have the feature that 
 * a (.lnk) extension is appended, which is obscured from the explorer.  As the names
 * differ, both a Cygwin Symlink and a Windows Shortcut can co-exist in the same place
 * and make for a consistent interface both in the cygwin shell and Windows Explorer.
 *
 * This means that one could well mirror every cygwin symlink with a windows shortcut,
 * and thus have symbolic links that work even under non-Administrator user shells and
 * are mirrored by corresponding Windows Shortcuts that work in the Windows Explorer.
 * Cygwin also comes with its own programmatic 'mkshortcut' command to create Shorcuts
 * from a shell (which you can't under windows by default - doesn't Windows suck?).
 * 
 *     dir       ->         /dir/            symlink
 *     dir.lnk   ->         /dir/            shortcut
 *     file      ->         /dir/file        symlink
 *     file.lnk  ->         /dir/file        shortcut
 * 
 * Note that synchronizing of Cygwin symlinks and Windows Shortcuts is left out-of-scope.
 * Our primary concern is to get our POSIX daemons running, not to add sugar to Explorer.
 * However LinkedFile will support and resolve links for both link formats, with first
 * preferrence given to Cygwin Symlinks if found, and then to Windows Shortcuts of the
 * same name (without the .lnk extension), which you may extend for such a facility.
 * 
 * For those wishing to extend the functionality to support transparent mirroring between
 * Explorer and the shell, note that you will have to map links to their (.lnk) shortcuts.
 * Also note that Windows Shortcuts are never interpreted as links by the Cygwin shell.
 * Most critically note that Windows shortcuts are unpredictable in the shell if moved,
 * as they seem to only recalculate relative paths after a link is accessed in Explorer. 
 *
 * 
 * @see cygwin:             http://www.cygwin.com
 * @see cygwin  symlinks:   http://cygwin.com/ml/cygwin/2010-11/msg00394.html 
 * @see cygwin  mklink:     http://cygwin.com/cygwin-ug-net/using-effectively.html
 * @see interix links:      http://technet.microsoft.com/en-us/library/bb463204.aspx
 * @see windows shortcuts:  http://ithreats.files.wordpress.com/2009/05/lnk_the_windows_shortcut_file_format.pdf
 * @see windows junctions:  http://technet.microsoft.com/en-us/sysinternals
 * 
 */
public class LinkedFile
    extends java.io.File
{

    public static final char TOK_NULL_CHAR = '\u0000';

    public static final String TOK_SYMLINK = "!<symlink>\u00FF\u00FE";

    public static final String TOK_SHORTCUT = "\u004C\u0000\u0000\u0000\u0001\u0014\u0002";

    public static final int    SHORTCUT_MAX = 1024;    // shortcuts should probably never be that big
    public static final int    SHORTCUT_OFFSET = 76;   // first shortcut section length offset at 0x4C

    // utilities

    /**
     * read a link header.
     * 
     * both cygwin symlinks and windows shortcuts are plain binary files.
     * cygwin is faily simple, consisting of magic header and the target.
     * shortcuts have variable length structures but will not exceed 1kb.
     */
    public static String header (File file)
        throws IOException
    {
        if (file == null)
            return null;

        String header = null;

        if (file.exists() && file.canRead())
        {
            if (file.isFile() || (!file.isDirectory()))
            {                
                char[] buf = new char [SHORTCUT_MAX];
                int length = new BufferedReader(new FileReader(file)).read(buf, 0, SHORTCUT_MAX);
                header = new String(buf);

                //System.err.println ("length: " + length);
                //System.err.println ("header: " + header);

                if (isSymlinkHeader(header))
                {
                    //System.err.println("isSymlink: true");                    
                }
                else if (isShortcutHeader(header))
                {
                    //System.err.println("isShortcut: true");
                }
                else
                {
                    header = null;
                }

            }
        }

        return header;
    }

    public static boolean isLinkHeader (String header)
        throws IOException
    {
        return ( isSymlinkHeader(header) || isShortcutHeader(header) );
    }

    public static boolean isSymlinkHeader (String header)
        throws IOException
    {
        if (header == null)
            return false;

        if (header.startsWith(TOK_SYMLINK))
        {
            return true;
        }

        return false;
    }

    public static boolean isShortcutHeader (String header)
        throws IOException
    {
        if (header == null)
            return false;

        if (header.startsWith(TOK_SHORTCUT))
        {
            return true;
        }

        return false;
    }

    /**
     * parse a 2-char short length offset from a position in a windows shortcut file.
     */
    public static int shortcutHeaderOffset (String header, int pos)
    {
        if (header == null)
            return 0;

        char small = header.charAt(pos);
        char large = header.charAt(pos +1);
        int offset = small + (large * 16 * 16);

        //System.err.println ("offset [" + pos + "] '" + (short)small + " " + (short)large + "' : " + offset);        
        return offset;
    }

    /**
     * parse the final link target from a cygwin symlink or windows shortcut.
     * 
     * cygwin symlink targets are immediately after the header "<!symlink> 0xFF xFE".
     * 
     * windows shortcuts start with the header "L 0x00 0x00 0x00 0x01 0x14 0x02".
     * the windows shortcut files are organized into sections of variable lengths,
     * starting after the header at 0x4c (76). the lengths are 2-char short offsets.
     * windows shortcut final name is at the end of the second "file location" section.
     */
    public static String headerTarget (String header)
    {
        if (header == null)
            return null;

        String target = null;

        try
        {
            if (isSymlinkHeader(header))
            {
                StringBuilder buf = new StringBuilder("");
                for (int i = TOK_SYMLINK.length();  i < header.length(); i+=2)
                {
                    if (header.charAt(i) == TOK_NULL_CHAR) {
                        break;
                    }
                    buf.append (header.charAt(i));
                }

                target = buf.toString();                
            }
            else if (isShortcutHeader(header))
            {
                // first get the length of the next "item id" section 
                int a = shortcutHeaderOffset(header, SHORTCUT_OFFSET);
                // now get the length of the next "file location" section
                int b = shortcutHeaderOffset(header, SHORTCUT_OFFSET + a  + 2);

                // the final name is the last field of the "file location" section 
                int j = SHORTCUT_OFFSET + a + b;                
                int i = header.lastIndexOf(TOK_NULL_CHAR, j - 2) + 1;

                target = header.substring (i,j);                
            }
        }
        catch (IOException e) {}

        //System.err.println ("target: " + target);
        return target;
    }

    public static boolean isLink (File file)
    {
        if (file == null)
            return false;

        try
        {
            if (file.exists() && file.canRead())
            {
                if (file.isFile() || (!file.isDirectory()))
                {
                    String header = header(file);
                    return isLinkHeader(header);
                }
            }
        }
        catch (IOException e) {}

        return false;
    }

    public static boolean isSymlink (File file)
    {
        if (file == null)
            return false;

        try
        {
            if (file.exists() && file.canRead())
            {
                if (file.isFile() || (!file.isDirectory()))
                {
                    String header = header(file);
                    return isSymlinkHeader(header);
                }
            }
        }
        catch (IOException e) {}

        return false;
    }

    public static boolean isShortcut (File file)
    {
        if (file == null)
            return false;

        try {
            if (file.exists() && file.canRead())
            {
                if (file.isFile() || (!file.isDirectory()))
                {
                    String header = header(file);
                    return isShortcutHeader(header);
                }
            }
        }
        catch (IOException e) {}

        return false;
    }

    /**
     * Resolve a target by recursively traversing links until the canonical target.
     * If the file is not a link, return the file's canonical AbsolutePath.
     */
    public static String resolveTarget (File file)
    {
        String target = file.getAbsolutePath();
        //System.err.println("\path: " + target);

        try
        {
            if (file.exists() && file.canRead())
            {
                if (file.isFile() || (!file.isDirectory()))
                {
                    String header = header(file);
                    if (isLinkHeader(header))
                    {
                        target = headerTarget(header);
                        boolean absolute = (target.substring(1)).startsWith(":");
                        boolean logical = (target.startsWith("/") || target.startsWith("\\"));
                        boolean parent = (target.startsWith("../") || target.startsWith("..\\"));                        
                        boolean current = (target.startsWith("./") || target.startsWith(".\\"));

                        File link = null;
                        if (absolute)
                            link = new File (target.substring(0,2), target.substring(2));
                        else if (logical)
                            link = new File(target.substring(0,1), target.substring(1));
                        else if (parent)
                            link = new File(target.substring(0,2), target.substring(2));                        
                        else if (current)
                            link = new File(target.substring(0,1), target.substring(1));
                        else
                            link = new File (target);

                        if (! file.equals(link))
                        {
                            target = resolveTarget(link);
                        }
                    }
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

        //System.err.println("resolve: " + target);
        return target;
    }

    /**
     * 
     * Resolve a file by recursively traversing links until the canonical target.
     * If the file is not a link, return the file itself.

     * @param file
     * @return
     */
    public static File resolveFile (File file)
    {
        //System.err.println("file: " + file.getAbsolutePath());

        try
        {
            if (file.exists() && file.canRead())
            {
                if (file.isFile() || (!file.isDirectory()))
                {
                    String header = header(file);
                    if (isLinkHeader(header))
                    {
                        String target = headerTarget(header);
                        boolean absolute = (target.substring(1)).startsWith(":");
                        boolean logical = (target.startsWith("/") || target.startsWith("\\"));
                        boolean parent = (target.startsWith("../") || target.startsWith("..\\"));                        
                        boolean current = (target.startsWith("./") || target.startsWith(".\\"));

                        File link = null;
                        if (absolute)
                            link = new File (target.substring(0,2), target.substring(2));
                        else if (logical)
                            link = new File(target.substring(0,1), target.substring(1));
                        else if (parent)
                            link = new File(target.substring(0,2), target.substring(2));                        
                        else if (current)
                            link = new File(target.substring(0,1), target.substring(1));
                        else
                            link = new File (target);

                        if (! file.equals(link))
                        {    
                            file = resolveFile(link.getAbsoluteFile());
                        }
                    }
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }

        //System.err.println("resolve: " + file.getAbsolutePath());
        return file;
    }

    // commmands

    /**
     * LinkedFile command resolves a link and prints its final link target name.
     * 
     * @param args
     * @throws IOException
     */
    public static void main (String[] args)
        throws IOException
    {
        try
        {
            if (args.length == 0)
            {
                System.err.println ("usage: LinkedFile <filepath>");
                System.exit (1);
            }

            LinkedFile file = new LinkedFile(args[0]);
            System.out.println(file.format().toString());
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.exit (1);
        }

    }

    // fields

    protected String source;        // the file source
    protected String target;        // the resolved target path

    protected String header;        // the link header, if source is a link
    protected boolean symlink;      // file is a cygwin symlink
    protected boolean shortcut;     // file is a windows shortcut

    protected File link;            // the original source file link

    // instantiators

    // constructors

    /**
     * Creates a new File instance by converting the given pathname string into
     * an abstract pathname.
     */
    public LinkedFile(String pathname)
    {
        super((resolveFile(new File(pathname))).getAbsolutePath());

        this.target = getAbsolutePath();
        this.source = "" + pathname;

        this.link = new File(pathname);
        this.symlink = isSymlink(link);
        this.shortcut = isShortcut(link);
    }

    /**
     * Creates a new File instance from a parent abstract pathname and a child
     * pathname string.
     */
    public LinkedFile(File parent, String child)
    {
        super((resolveFile(new File(parent, child))).getAbsolutePath());

        this.target = getAbsolutePath();
        this.source = "" + parent.getName() + "/" + child;

        this.link = new File(parent, child);
        this.symlink = isSymlink(link);
        this.shortcut = isShortcut(link);
    }

    /**
     * Creates a new File instance from a parent pathname string and a child
     * pathname string.
     */
    public LinkedFile(String parent, String child)
    {
        super((resolveFile(new File(parent, child))).getAbsolutePath());

        this.target = getAbsolutePath();
        this.source = "" + parent + "/" + child;

        this.link = new File(parent, child);
        this.symlink = isSymlink(link);
        this.shortcut = isShortcut(link);
    }

    /**
     * Creates a new File instance by converting the given file: URI into an
     * abstract pathname.
     */
    public LinkedFile(URI uri)
    {
        super((resolveFile(new File(uri))).getAbsolutePath());

        this.target = getAbsolutePath();
        this.source = "" + uri;

        this.link = new File(uri);
        this.symlink = isSymlink(link);
        this.shortcut = isShortcut(link);
    }

    // accessors

    public boolean isLink()
    {
        return (symlink || shortcut);
    }

    public boolean isSymlink()
    {
        return (symlink);
    }

    public boolean isShortcut()
    {
        return (shortcut);
    }

    public String getSource()
    {
        return source;
    }

    public String getTarget()
    {
        //return target.replace('\\', '/');     
        return target;
    }

    // overrides

    public String toString() {
        return super.toString();
    }

    public StringBuilder format()
    {
        StringBuilder s = new StringBuilder("");
        format (s);
        return s;
    }

    public StringBuilder format (StringBuilder s) {

        if (isLink()) {
            s.append(getSource() + " => ");
        }

        s.append(getTarget());

        if (isDirectory()) {
            //s.append("/");
            s.append(File.separatorChar);
        }

        return s;
    }

}

java.io.File

We then want to modify key MapRed Task handling classes to replace java.io.File
instances with our new symlink-enabled org.apache.hadoop.fs.LinkedFile wrapper.

replace all instances of java.io.File:


// -- cygwin patch --
import java.io.File;

// -- cygwin patch --
File foo = new File(bar);

with org.apache.hadoop.fs.LinkedFile:


// -- cygwin patch --
import org.apache.hadoop.fs.LinkedFile;

// -- cygwin patch --    
File foo = new LinkedFile(bar);

in the following sources:


<?text type="fileset"?>

TaskLogAppender.java
TaskLogTruncater.java
TaskLogServlet.java

TaskLog.java

Most of the big changes are in TaskLog.java, which I include in its entirety.

Specifically, TaskLog makes judicious use of Path.normalizePath() to ensure
that we are dealing with posix-style paths where we need to and uses LinkedFile
to create the task attempt attemptLogDirs and their contents.

TaskLog.java:


// -- cygwin patch --

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LinkedFile;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/**
 * A simple logger to handle the task-specific user logs.
 * This class uses the system property <code>hadoop.log.dir</code>.
 * 
 * This class is for Map/Reduce internal use only.
 * 
 */
public class TaskLog {
  private static final Log LOG =
    LogFactory.getLog(TaskLog.class);

  static final String USERLOGS_DIR_NAME = "userlogs";

  private static final File LOG_DIR = 
    new LinkedFile(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();

  // localFS is set in (and used by) writeToIndexFile()
  static LocalFileSystem localFS = null;
  static {
    if (!LOG_DIR.exists()) {
      LOG_DIR.mkdirs();
    }
  }

  static AtomicInteger rotor = new AtomicInteger(0);

  /**
   * Create log directory for the given attempt. This involves creating the
   * following and setting proper permissions for the new directories
   * <br>{hadoop.log.dir}/userlogs/<jobid>
   * <br>{hadoop.log.dir}/userlogs/<jobid>/<attempt-id-as-symlink>
   * <br>{one of the mapred-local-dirs}/userlogs/<jobid>
   * <br>{one of the mapred-local-dirs}/userlogs/<jobid>/<attempt-id>
   *
   * @param taskID attempt-id for which log dir is to be created
   * @param isCleanup Is this attempt a cleanup attempt ?
   * @param localDirs mapred local directories
   * @throws IOException
   */
  public static void createTaskAttemptLogDir(TaskAttemptID taskID,
      boolean isCleanup, String[] localDirs) throws IOException{
    String cleanupSuffix = isCleanup ? ".cleanup" : "";
    String strAttemptLogDir = getTaskAttemptLogDir(taskID, cleanupSuffix, localDirs);
    File attemptLogDir = new LinkedFile(strAttemptLogDir);

    // -- cygwin patch -- give a chance for logs to be created
    LOG.warn(String.format("Checking attemptLogDir: %s", strAttemptLogDir));
    if (! attemptLogDir.exists()) {
        LOG.warn(String.format("Creating attemptLogDir: %s", strAttemptLogDir));
        if (! (attemptLogDir.mkdirs() || attemptLogDir.exists()) ) {
            throw new IOException(String.format("Failed to create attemptLogDir: %s",
                    attemptLogDir));
        }
    }

    // -- cygwin patch -- normalize path
    String strJobDir =
        Path.normalizePath(getJobDir(taskID.getJobID()).getAbsolutePath());
    String strLinkAttemptLogDir =
        strJobDir + "/" + taskID.toString() + cleanupSuffix;
    File linkAttemptLogDir = new LinkedFile(strLinkAttemptLogDir);

    LOG.warn(String.format("Checking symlink: %s <- %s", strAttemptLogDir, strLinkAttemptLogDir));
    if (! linkAttemptLogDir.exists()) {
        LOG.warn(String.format("Creating symlink: %s <- %s", strAttemptLogDir, strLinkAttemptLogDir));
        if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
            throw new IOException(String.format("Failed to create symlink: %s <- %s",
                    strAttemptLogDir, strLinkAttemptLogDir));
        }
    }

    //Set permissions for target attempt log dir 
    FsPermission userOnly = new FsPermission((short) 0700);
    FileUtil.setPermission(attemptLogDir, userOnly);
  }

  /**
   * Get one of the mapred local directory in a round-robin-way.
   * @param localDirs mapred local directories
   * @return the next chosen mapred local directory
   * @throws IOException
   */
  private static String getNextLocalDir(String[] localDirs) throws IOException{
    if(localDirs.length == 0) {
      throw new IOException ("Not enough mapred.local.dirs ("
                             + localDirs.length + ")");
    }
    return localDirs[Math.abs(rotor.getAndIncrement()) % localDirs.length];  
  }

  /**
   * Get attempt log directory path for the given attempt-id under randomly
   * selected mapred local directory.
   * @param taskID attempt-id for which log dir path is needed
   * @param cleanupSuffix ".cleanup" if this attempt is a cleanup attempt 
   * @param localDirs mapred local directories
   * @return target task attempt log directory
   * @throws IOException
   */
  public static String getTaskAttemptLogDir(TaskAttemptID taskID, 
      String cleanupSuffix, String[] localDirs) throws IOException {

    // -- cygwin patch -- normalize path

    StringBuilder taskLogDirLocation = new StringBuilder();
    taskLogDirLocation.append(getNextLocalDir(localDirs));
    taskLogDirLocation.append("/");
    taskLogDirLocation.append(USERLOGS_DIR_NAME);
    taskLogDirLocation.append("/");
    taskLogDirLocation.append(taskID.getJobID().toString());
    taskLogDirLocation.append("/");
    taskLogDirLocation.append(taskID.toString()+cleanupSuffix);

    return Path.normalizePath(taskLogDirLocation.toString());
  }

  public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
      LogName filter) {
    return new LinkedFile(getAttemptDir(taskid, isCleanup), filter.toString());
  }

  /**
   * Get the real task-log file-path
   * 
   * @param location Location of the log-file. This should point to an
   *          attempt-directory.
   * @param filter
   * @return
   * @throws IOException
   */
  static String getRealTaskLogFilePath(String location, LogName filter)
      throws IOException {
    return FileUtil.makeShellPath(new LinkedFile(location, filter.toString()));
  }

  static class LogFileDetail {
    final static String LOCATION = "LOG_DIR:";
    String location;
    long start;
    long length;
  }

  static Map<LogName, LogFileDetail> getAllLogsFileDetails(
      TaskAttemptID taskid, boolean isCleanup) throws IOException {

    Map<LogName, LogFileDetail> allLogsFileDetails =
        new HashMap<LogName, LogFileDetail>();

    File indexFile = getIndexFile(taskid, isCleanup);
    LOG.warn (String.format("IndexFile: %s", indexFile));

    BufferedReader fis = new BufferedReader(new InputStreamReader(
      SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid))));

    //the format of the index file is
    //LOG_DIR: <the dir where the task logs are really stored>
    //stdout:<start-offset in the stdout file> <length>
    //stderr:<start-offset in the stderr file> <length>
    //syslog:<start-offset in the syslog file> <length>
    String str = fis.readLine();
    if (str == null) { //Index file not found
      throw new IOException ("Index file not found for task " + taskid);
    }
    String loc = str.substring(str.indexOf(LogFileDetail.LOCATION)+
        LogFileDetail.LOCATION.length());

    LOG.warn (String.format("'LogFile loc: %s'", loc));

    //special cases are the debugout and profile.out files. They are guaranteed
    //to be associated with each task attempt since jvm reuse is disabled
    //when profiling/debugging is enabled
    for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
      LogFileDetail l = new LogFileDetail();
      l.location = loc;
      l.length = new LinkedFile(l.location, filter.toString()).length();
      l.start = 0;
      allLogsFileDetails.put(filter, l);
    }
    str = fis.readLine();
    while (str != null) {
      LogFileDetail l = new LogFileDetail();
      l.location = loc;
      int idx = str.indexOf(':');
      LogName filter = LogName.valueOf(str.substring(0, idx).toUpperCase());
      str = str.substring(idx + 1);
      String[] startAndLen = str.split(" ");
      l.start = Long.parseLong(startAndLen[0]);

      l.length = Long.parseLong(startAndLen[1]);
      if (l.length == -1L) {
        l.length = new LinkedFile(l.location, filter.toString()).length();
      }

      allLogsFileDetails.put(filter, l);
      str = fis.readLine();
    }
    fis.close();
    return allLogsFileDetails;
  }

  static File getTmpIndexFile(TaskAttemptID taskid, boolean isCleanup) {
    return new LinkedFile(getAttemptDir(taskid, isCleanup), "log.tmp");
  }

  static File getIndexFile(TaskAttemptID taskid, boolean isCleanup) {
    return new LinkedFile(getAttemptDir(taskid, isCleanup), "log.index");
  }

  /**
   * Obtain the owner of the log dir. This is 
   * determined by checking the job log directory.
   */
  static String obtainLogDirOwner(TaskAttemptID taskid) throws IOException {
    if (localFS == null) {
      localFS = FileSystem.getLocal(new Configuration());
    }
    FileSystem raw = localFS.getRaw();
    Path jobLogDir = new Path(getJobDir(taskid.getJobID()).getAbsolutePath());
    FileStatus jobStat = raw.getFileStatus(jobLogDir);
    return jobStat.getOwner();
  }

  public static String getBaseLogDir() {
    return System.getProperty("hadoop.log.dir");
  }

  static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
      String cleanupSuffix = isCleanup ? ".cleanup" : "";
      return getAttemptDir(taskid.getJobID().toString(),
              taskid.toString() + cleanupSuffix);
  }

  static File getAttemptDir(String jobid, String taskid) {
    // taskid should be fully formed and it should have the optional 
    // .cleanup suffix
    return new LinkedFile(getJobDir(jobid), taskid);
  }

  static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
      Arrays.asList(LogName.STDOUT, LogName.STDERR, LogName.SYSLOG);

  private static TaskAttemptID currentTaskid;

  /**
   * Map to store previous and current lengths.
   */
  private static Map<LogName, Long[]> logLengths =
      new HashMap<LogName, Long[]>();
  static {
    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
      logLengths.put(logName, new Long[] { Long.valueOf(0L),
          Long.valueOf(0L) });
    }
  }

  static synchronized 
  void writeToIndexFile(String logLocation,
                        TaskAttemptID currentTaskid, 
                        boolean isCleanup,
                        Map<LogName, Long[]> lengths) throws IOException {
    // To ensure atomicity of updates to index file, write to temporary index
    // file first and then rename.
    File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);

    LOG.warn (String.format("TmpIndexFile: %s", tmpIndexFile));

    BufferedOutputStream bos = 
      new BufferedOutputStream(
        SecureIOUtils.createForWrite(tmpIndexFile, 0644));
    DataOutputStream dos = new DataOutputStream(bos);
    //the format of the index file is
    //LOG_DIR: <the dir where the task logs are really stored>
    //STDOUT: <start-offset in the stdout file> <length>
    //STDERR: <start-offset in the stderr file> <length>
    //SYSLOG: <start-offset in the syslog file> <length>    
    dos.writeBytes(LogFileDetail.LOCATION
        + logLocation
        + "\n");
    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
      Long[] lens = lengths.get(logName);
      dos.writeBytes(logName.toString() + ":"
          + lens[0].toString() + " "
          + Long.toString(lens[1].longValue() - lens[0].longValue())
          + "\n");}
    dos.close();

    File indexFile = getIndexFile(currentTaskid, isCleanup);
    Path indexFilePath = new Path(indexFile.getAbsolutePath());
    Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());

    LOG.warn (String.format("IndexFile: %s", indexFile));

    if (localFS == null) {// set localFS once
      localFS = FileSystem.getLocal(new Configuration());
    }
    localFS.rename (tmpIndexFilePath, indexFilePath);
  }

  @SuppressWarnings("unchecked")
  public synchronized static void syncLogs(String logLocation, 
                                           TaskAttemptID taskid,
                                           boolean isCleanup,
                                           boolean segmented) 
  throws IOException {
    System.out.flush();
    System.err.flush();
    Enumeration<Logger> allLoggers = LogManager.getCurrentLoggers();
    while (allLoggers.hasMoreElements()) {
      Logger l = allLoggers.nextElement();
      Enumeration<Appender> allAppenders = l.getAllAppenders();
      while (allAppenders.hasMoreElements()) {
        Appender a = allAppenders.nextElement();
        if (a instanceof TaskLogAppender) {
          ((TaskLogAppender)a).flush();
        }
      }
    }
    if (currentTaskid == null) {
      currentTaskid = taskid;
    }
    // set start and end
    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
      if (currentTaskid != taskid) {
        // Set start = current-end
        logLengths.get(logName)[0] = Long.valueOf(new LinkedFile(
            logLocation, logName.toString()).length());
      }
      // Set current end
      logLengths.get(logName)[1]
        = (segmented
           ? (Long.valueOf
              (new LinkedFile(logLocation, logName.toString()).length()))
           : -1);
    }
    if (currentTaskid != taskid) {
      if (currentTaskid != null) {
        LOG.info("Starting logging for a new task " + taskid
            + " in the same JVM as that of the first task " + logLocation);
      }
      currentTaskid = taskid;
    }
    writeToIndexFile(logLocation, taskid, isCleanup, logLengths);
  }

  /**
   * The filter for userlogs.
   */
  public static enum LogName {
    /** Log on the stdout of the task. */
    STDOUT ("stdout"),

    /** Log on the stderr of the task. */
    STDERR ("stderr"),

    /** Log on the map-reduce system logs of the task. */
    SYSLOG ("syslog"),

    /** The java profiler information. */
    PROFILE ("profile.out"),

    /** Log the debug script stdout  */
    DEBUGOUT ("debugout");

    private String prefix;

    private LogName(String prefix) {
      this.prefix = prefix;
    }

    @Override
    public String toString() {
      return prefix;
    }
  }

  static class Reader extends InputStream {
    private long bytesRemaining;
    private FileInputStream file;

    /**
     * Read a log file from start to end positions. The offsets may be negative,
     * in which case they are relative to the end of the file. For example,
     * Reader(taskid, kind, 0, -1) is the entire file and 
     * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. 
     * @param taskid the id of the task to read the log file for
     * @param kind the kind of log to read
     * @param start the offset to read from (negative is relative to tail)
     * @param end the offset to read upto (negative is relative to tail)
     * @param isCleanup whether the attempt is cleanup attempt or not
     * @throws IOException
     */
    public Reader(TaskAttemptID taskid, LogName kind, 
                  long start, long end, boolean isCleanup) throws IOException {
      // find the right log file
      Map<LogName, LogFileDetail> allFilesDetails =
          getAllLogsFileDetails(taskid, isCleanup);
      LogFileDetail fileDetail = allFilesDetails.get(kind);
      // calculate the start and stop
      long size = fileDetail.length;
      if (start < 0) {
        start += size + 1;
      }
      if (end < 0) {
        end += size + 1;
      }
      start = Math.max(0, Math.min(start, size));
      end = Math.max(0, Math.min(end, size));
      start += fileDetail.start;
      end += fileDetail.start;
      bytesRemaining = end - start;
      String owner = obtainLogDirOwner(taskid);
      file = SecureIOUtils.openForRead(new LinkedFile(fileDetail.location, kind.toString()), 
          owner);
      // skip upto start
      long pos = 0;
      while (pos < start) {
        long result = file.skip(start - pos);
        if (result < 0) {
          bytesRemaining = 0;
          break;
        }
        pos += result;
      }
    }

    @Override
    public int read() throws IOException {
      int result = -1;
      if (bytesRemaining > 0) {
        bytesRemaining -= 1;
        result = file.read();
      }
      return result;
    }

    @Override
    public int read(byte[] buffer, int offset, int length) throws IOException {
      length = (int) Math.min(length, bytesRemaining);
      int bytes = file.read(buffer, offset, length);
      if (bytes > 0) {
        bytesRemaining -= bytes;
      }
      return bytes;
    }

    @Override
    public int available() throws IOException {
      return (int) Math.min(bytesRemaining, file.available());
    }

    @Override
    public void close() throws IOException {
      file.close();
    }
  }

  private static final String bashCommand = "bash";
  private static final String tailCommand = "tail";

  /**
   * Get the desired maximum length of task logs.
   * @param conf the job to look in
   * @return the number of bytes to cap the log files at
   */
  public static long getTaskLogLength(JobConf conf) {
    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
  }

  /**
   * Wrap a command in a shell to capture stdout and stderr to files.
   * If the tailLength is 0, the entire output will be saved.
   * @param cmd The command and the arguments that should be run
   * @param stdoutFilename The filename that stdout should be saved to
   * @param stderrFilename The filename that stderr should be saved to
   * @param tailLength The length of the tail to be saved.
   * @return the modified command that should be run
   */
  public static List<String> captureOutAndError(List<String> cmd, 
                                                File stdoutFilename,
                                                File stderrFilename,
                                                long tailLength
                                               ) throws IOException {
    return captureOutAndError(null, cmd, stdoutFilename,
                              stderrFilename, tailLength, false);
  }

  /**
   * Wrap a command in a shell to capture stdout and stderr to files.
   * Setup commands such as setting memory limit can be passed which 
   * will be executed before exec.
   * If the tailLength is 0, the entire output will be saved.
   * @param setup The setup commands for the execed process.
   * @param cmd The command and the arguments that should be run
   * @param stdoutFilename The filename that stdout should be saved to
   * @param stderrFilename The filename that stderr should be saved to
   * @param tailLength The length of the tail to be saved.
   * @return the modified command that should be run
   */
  public static List<String> captureOutAndError(List<String> setup,
                                                List<String> cmd, 
                                                File stdoutFilename,
                                                File stderrFilename,
                                                long tailLength
                                               ) throws IOException {
    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
        tailLength, false);
  }

  /**
   * Wrap a command in a shell to capture stdout and stderr to files.
   * Setup commands such as setting memory limit can be passed which 
   * will be executed before exec.
   * If the tailLength is 0, the entire output will be saved.
   * @param setup The setup commands for the execed process.
   * @param cmd The command and the arguments that should be run
   * @param stdoutFilename The filename that stdout should be saved to
   * @param stderrFilename The filename that stderr should be saved to
   * @param tailLength The length of the tail to be saved.
   * @deprecated pidFiles are no more used. Instead pid is exported to
   *             env variable JVM_PID.
   * @return the modified command that should be run
   */
  @Deprecated
  public static List<String> captureOutAndError(List<String> setup,
                                                List<String> cmd, 
                                                File stdoutFilename,
                                                File stderrFilename,
                                                long tailLength,
                                                String pidFileName
                                               ) throws IOException {
    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
        tailLength, false, pidFileName);
  }

  /**
   * Wrap a command in a shell to capture stdout and stderr to files.
   * Setup commands such as setting memory limit can be passed which 
   * will be executed before exec.
   * If the tailLength is 0, the entire output will be saved.
   * @param setup The setup commands for the execed process.
   * @param cmd The command and the arguments that should be run
   * @param stdoutFilename The filename that stdout should be saved to
   * @param stderrFilename The filename that stderr should be saved to
   * @param tailLength The length of the tail to be saved.
   * @param useSetsid Should setsid be used in the command or not.
   * @deprecated pidFiles are no more used. Instead pid is exported to
   *             env variable JVM_PID.
   * @return the modified command that should be run
   * 
   */
  @Deprecated
  public static List<String> captureOutAndError(List<String> setup,
      List<String> cmd, 
      File stdoutFilename,
      File stderrFilename,
      long tailLength,
      boolean useSetsid,
      String pidFileName
     ) throws IOException {
    return captureOutAndError(setup,cmd, stdoutFilename, stderrFilename, tailLength,
        useSetsid);
  }

  /**
   * Wrap a command in a shell to capture stdout and stderr to files.
   * Setup commands such as setting memory limit can be passed which 
   * will be executed before exec.
   * If the tailLength is 0, the entire output will be saved.
   * @param setup The setup commands for the execed process.
   * @param cmd The command and the arguments that should be run
   * @param stdoutFilename The filename that stdout should be saved to
   * @param stderrFilename The filename that stderr should be saved to
   * @param tailLength The length of the tail to be saved.
   * @param useSetsid Should setsid be used in the command or not.
   * @return the modified command that should be run
   */
  public static List<String> captureOutAndError(List<String> setup,
      List<String> cmd, 
      File stdoutFilename,
      File stderrFilename,
      long tailLength,
      boolean useSetsid
     ) throws IOException {
    List<String> result = new ArrayList<String>(3);
    result.add(bashCommand);
    result.add("-c");
    String mergedCmd = buildCommandLine(setup,
        cmd,
        stdoutFilename,
        stderrFilename, tailLength,
        useSetsid);
    result.add(mergedCmd.toString());
    return result; 
  }

  static String buildCommandLine(List<String> setup,
      List<String> cmd, 
      File stdoutFilename,
      File stderrFilename,
      long tailLength,
      boolean useSetSid) throws IOException {

    String stdout = FileUtil.makeShellPath(stdoutFilename);
    String stderr = FileUtil.makeShellPath(stderrFilename);
    StringBuilder mergedCmd = new StringBuilder();

    if (!Shell.WINDOWS) {
      mergedCmd.append("export JVM_PID=`echo $$`\n");
    }

    if (setup != null) {
      for (String s : setup) {
        mergedCmd.append(s);
        mergedCmd.append("\n");
      }
    }
    if (tailLength > 0) {
      mergedCmd.append("(");
    } else if (ProcessTree.isSetsidAvailable && useSetSid 
        && !Shell.WINDOWS) {
      mergedCmd.append("exec setsid ");
    } else {
      mergedCmd.append("exec ");
    }
    mergedCmd.append(addCommand(cmd, true));
    mergedCmd.append(" < /dev/null ");
    if (tailLength > 0) {
      mergedCmd.append(" | ");
      mergedCmd.append(tailCommand);
      mergedCmd.append(" -c ");
      mergedCmd.append(tailLength);
      mergedCmd.append(" >> ");
      mergedCmd.append(stdout);
      mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
      mergedCmd.append(tailCommand);
      mergedCmd.append(" -c ");
      mergedCmd.append(tailLength);
      mergedCmd.append(" >> ");
      mergedCmd.append(stderr);
      mergedCmd.append(" ; exit $PIPESTATUS");
    } else {
      mergedCmd.append(" 1>> ");
      mergedCmd.append(stdout);
      mergedCmd.append(" 2>> ");
      mergedCmd.append(stderr);
    }
    return mergedCmd.toString();
  }

  /**
   * Add quotes to each of the command strings and
   * return as a single string 
   * @param cmd The command to be quoted
   * @param isExecutable makes shell path if the first 
   * argument is executable
   * @return returns The quoted string. 
   * @throws IOException
   */
  public static String addCommand(List<String> cmd, boolean isExecutable) 
  throws IOException {
    StringBuffer command = new StringBuffer();
    for(String s: cmd) {
        command.append('\'');
      if (isExecutable) {
        // the executable name needs to be expressed as a shell path for the  
        // shell to find it.
          command.append(FileUtil.makeShellPath(new LinkedFile(s)));
        isExecutable = false; 
      } else {
          command.append(s);
      }
      command.append('\'');
      command.append(" ");
    }
    return command.toString();
  }

  /**
   * Wrap a command in a shell to capture debug script 
   * stdout and stderr to debugout.
   * @param cmd The command and the arguments that should be run
   * @param debugoutFilename The filename that stdout and stderr
   *  should be saved to.
   * @return the modified command that should be run
   * @throws IOException
   */
  public static List<String> captureDebugOut(List<String> cmd, 
                                             File debugoutFilename
                                            ) throws IOException {
    String debugout = FileUtil.makeShellPath(debugoutFilename);
    List<String> result = new ArrayList<String>(3);
    result.add(bashCommand);
    result.add("-c");
    StringBuffer mergedCmd = new StringBuffer();
    mergedCmd.append("exec ");
    boolean isExecutable = true;
    for(String s: cmd) {
      if (isExecutable) {
        // the executable name needs to be expressed as a shell path for the  
        // shell to find it.
        mergedCmd.append(FileUtil.makeShellPath(new LinkedFile(s)));
        isExecutable = false; 
      } else {
        mergedCmd.append(s);
      }
      mergedCmd.append(" ");
    }
    mergedCmd.append(" < /dev/null ");
    mergedCmd.append(" >");
    mergedCmd.append(debugout);
    mergedCmd.append(" 2>&1 ");
    result.add(mergedCmd.toString());
    return result;
  }

  public static File getUserLogDir() {  
    return LOG_DIR;
  }

  /**
   * Get the user log directory for the job jobid.
   * 
   * @param jobid string representation of the jobid
   * @return user log directory for the job
   */
  public static File getJobDir(String jobid) {
    return new LinkedFile(getUserLogDir(), jobid);
  }

  /**
   * Get the user log directory for the job jobid.
   * 
   * @param jobid the jobid object
   * @return user log directory for the job
   */
  public static File getJobDir(JobID jobid) {
    return getJobDir(jobid.toString());
  }

} // TaskLog

Configuring Hadoop

core-site.xml

I'm not a big fan of services being assigned to seemingly random ports.
I try to regroup all the hadoop ports together in the 50000 range.

core-site.xml:


<?xml version="1.0"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:50000</value>    
  </property>

</configuration>

hdfs-site.xml

Next NameNode and DFS need to ensure their log paths map to our unified cygwin tmp dir.

Also note how the DFS supergroup is our "root" clone of the NT "Administrators" group.

In pseudo-cluster mode, hadoop expects the dfs.replication=1 replication mode.
This corresponds to dfs.replication.min=1.

hdfs-site.xml:


<?xml version="1.0"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.umaskmode</name>
    <value>002</value>
  </property>
  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>
  <property>
    <name>dfs.permissions.supergroup</name>
    <value>root</value>
  </property>

  <property>
    <name>dfs.data.dir</name>
    <value>${hadoop.tmp.dir}/dfs/data</value>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>${hadoop.tmp.dir}/dfs/name</value>
  </property>

</configuration>

mapred-site.xml

Again we regroup MapReduce service ports in the 50000 range:

mapred-site.xml:


<?xml version="1.0"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:50005</value>            
  </property>  
  <property>
    <name>mapred.map.log.level</name>
    <value>DEBUG</value>
  </property>  
  <property>
    <name>mapred.reduce.log.level</name>
    <value>DEBUG</value>
  </property>

</configuration>

taskcontroller.cfg

The TaskTracker must also ensure that its files are in the cygwin unified tmp dir.

taskcontroller.cfg:


<?properties ?>

#configured value of mapred.local.dir. It can be a list of comma separated paths.
#mapred.local.dir=
mapred.local.dir=/cygwin/tmp/hadoop-cyg_server/mapred/local

#configured value of hadoop.log.dir.
#hadoop.log.dir=
hadoop.log.dir=/cygwin/tmp/hadoop-cyg_server/mapred/local

#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds
mapred.tasktracker.tasks.sleeptime-before-sigkill=

#configured value of mapreduce.tasktracker.group.
mapreduce.tasktracker.group=

#mapred spwaned child tmp working directory: specify and absolute path for cygwin
mapred.child.tmp=/cygwin/tmp/hadoop-cyg_server/mapred/local

Deploying Hadoop

Finally, we build hadoop as well as the examples module.

Copy the jars in the build directory overwriting the
existing jars in the hadoop home parent directory.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

ant -f build.xml compile examples

cp build/hadoop-client-1.0.2-SNAPSHOT.jar ./hadoop-client-1.0.1.jar
cp build/hadoop-core-1.0.2-SNAPSHOT.jar ./hadoop-core-1.0.1.jar
cp build/hadoop-minicluster-1.0.2-SNAPSHOT.jar ./hadoop-minicluster-1.0.1.jar
cp build/hadoop-tools-1.0.2-SNAPSHOT.jar ./hadoop-tools-1.0.1.jar

cp build/hadoop-examples-1.0.2-SNAPSHOT.jar ./hadoop-examples-1.0.1.jar

Running Hadoop

Managing Hadoop

Hadoop Daemons

Whenever you recompile and redeploy hadoop, you will have to
stop the servers (./bin/stop-all.sh), then erase the hadoop
dfs data and namenode control directories, and then finally
format the namenode anew.

For a clean run, erase previous hadoop logs and mapred local logs
and delete the hadoop dfs data and name control directories.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

rm -fr /cygwin/tmp/hadoop-cyg_server/logs/*
rm -fr /cygwin/tmp/hadoop-cyg_server/mapred/local/*

rm -fr /cygwin/tmp/hadoop-cyg_server/dfs/data
rm -fr /cygwin/tmp/hadoop-cyg_server/dfs/name
rm -fr /cygwin/tmp/hadoop-cyg_server/dfs/namesecondary

Reformat the namenode and start the core hadoop daemons.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop namenode -format

./bin/start-all.sh

You should see 4 java processes for the namenode, datanode,
jobtracker, and tasktracker.

That was a lot of yak shaving just to get this running.

DFS Directories

DFS manages a virtual filesystem containing only links to file
blocks. by default only the cygwin root is visible as a readonly
pseudo-directory from which you can place (put) files into dfs,
and you must populate your dfs user directory and place mapred
input files and libraries into this dfs.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -ls /

Found 1 item
drwxr-xr-x   - cyg_server root          0 2012-05-03 10:59 /cygwin

DFS expects the following user directories:


<?text type="fileset"?>

/cygwin/
/user/
  cyg_server/
    input/
   *output/

note: * the /user/cyg_server/output path is managed by mapred tasks.
you will have to erase (dfs -rmr) after each mapred invocation run.

First create the user directories:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -mkdir /user/
./bin/hadoop dfs -mkdir /user/cyg_server
./bin/hadoop dfs -mkdir /user/cyg_server/input

./bin/hadoop dfs -chmod 775 /user/
./bin/hadoop dfs -chmod 775 /user/cyg_server
./bin/hadoop dfs -chmod 775 /user/cyg_server/input

Testing MapRed

MapRed wordcount

The hadoop-examples jar contains the wordcount test application,
which we will configure for our first hadoop mapred trial run.

Recall that dfs can only populate data from the visible /cygwin
read-only drive, so we must first create some temporary data
directories from which we will load input into dfs.

MapRed wordcount execution

MapRed wordcount input

First create your wordcount input files in your tmp directory:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

mkdir -p /cygwin/tmp/hadoop-cyg_server/dfs/input
mkdir -p /cygwin/tmp/hadoop-cyg_server/dfs/output

chmod 775 /cygwin/tmp/hadoop-cyg_server/dfs/input
chmod 775 /cygwin/tmp/hadoop-cyg_server/dfs/output

echo "Hello World Bye World"           > /cygwin/tmp/hadoop-cyg_server/dfs/input/file01
echo "Hello Hadoop GoodbyeBye Hadoop"  > /cygwin/tmp/hadoop-cyg_server/dfs/input/file02

chmod 664 /cygwin/tmp/hadoop-cyg_server/dfs/input/file01
chmod 664 /cygwin/tmp/hadoop-cyg_server/dfs/input/file02

Now populate the input files in your dfs user input directory:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -put /cygwin/tmp/hadoop-cyg_server/dfs/input/file01 /user/cyg_server/input/
./bin/hadoop dfs -put /cygwin/tmp/hadoop-cyg_server/dfs/input/file02 /user/cyg_server/input/

You should be able to see both files with non-zero lengths:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -ls /user/cyg_server/input/

Found 2 items
-rw-rw-r--   1 cyg_server root         22 2012-04-05 16:51 /user/cyg_server/input/file01
-rw-rw-r--   1 cyg_server root         31 2012-04-05 16:51 /user/cyg_server/input/file02

Erase the user's previous DFS output directory if present and
invoke the haddop wordcount application from the examples jar,
specifying the user's dfs input dir and output dir to be created.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -rmr /user/cyg_server/output

./bin/hadoop jar hadoop-examples-1.0.1.jar wordcount /user/cyg_server/input /user/cyg_server/output

MapRed should execute the tasks, somewhat slowly.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop jar hadoop-examples-1.0.1.jar wordcount /user/cyg_server/input /user/cyg_server/output

****hdfs://localhost:50000/user/cyg_server/input
12/05/05 14:24:21 INFO input.FileInputFormat: Total input paths to process : 2
12/05/05 14:24:21 INFO mapred.JobClient: Running job: job_201205051422_0001
12/05/05 14:24:22 INFO mapred.JobClient:  map 0% reduce 0%
12/05/05 14:24:52 INFO mapred.JobClient:  map 100% reduce 0%
12/05/05 14:25:10 INFO mapred.JobClient:  map 100% reduce 100%
12/05/05 14:25:21 INFO mapred.JobClient: Job complete: job_201205051422_0001
12/05/05 14:25:21 INFO mapred.JobClient: Counters: 26
12/05/05 14:25:21 INFO mapred.JobClient:   Job Counters
12/05/05 14:25:21 INFO mapred.JobClient:     Launched reduce tasks=1
12/05/05 14:25:21 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=44021
12/05/05 14:25:21 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/05/05 14:25:21 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/05/05 14:25:21 INFO mapred.JobClient:     Launched map tasks=2
12/05/05 14:25:21 INFO mapred.JobClient:     Data-local map tasks=2
12/05/05 14:25:21 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=16687
12/05/05 14:25:21 INFO mapred.JobClient:   File Output Format Counters
12/05/05 14:25:21 INFO mapred.JobClient:     Bytes Written=44
12/05/05 14:25:21 INFO mapred.JobClient:   FileSystemCounters
12/05/05 14:25:21 INFO mapred.JobClient:     FILE_BYTES_READ=231
12/05/05 14:25:21 INFO mapred.JobClient:     HDFS_BYTES_READ=285
12/05/05 14:25:21 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=66696
12/05/05 14:25:21 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=44
12/05/05 14:25:21 INFO mapred.JobClient:   File Input Format Counters
12/05/05 14:25:21 INFO mapred.JobClient:     Bytes Read=53
12/05/05 14:25:21 INFO mapred.JobClient:   Map-Reduce Framework
12/05/05 14:25:21 INFO mapred.JobClient:     Map output materialized bytes=88
12/05/05 14:25:21 INFO mapred.JobClient:     Map input records=2
12/05/05 14:25:21 INFO mapred.JobClient:     Reduce shuffle bytes=88
12/05/05 14:25:21 INFO mapred.JobClient:     Spilled Records=12
12/05/05 14:25:21 INFO mapred.JobClient:     Map output bytes=85
12/05/05 14:25:21 INFO mapred.JobClient:     Total committed heap usage (bytes)=483065856
12/05/05 14:25:21 INFO mapred.JobClient:     Combine input records=8
12/05/05 14:25:21 INFO mapred.JobClient:     SPLIT_RAW_BYTES=232
12/05/05 14:25:21 INFO mapred.JobClient:     Reduce input records=6
12/05/05 14:25:21 INFO mapred.JobClient:     Reduce input groups=5
12/05/05 14:25:21 INFO mapred.JobClient:     Combine output records=6
12/05/05 14:25:21 INFO mapred.JobClient:     Reduce output records=5
12/05/05 14:25:21 INFO mapred.JobClient:     Map output records=8

TaskTracker results

I've managed to get this working to the point where jobs are dispatched, tasks executed, and results compiled.

You should be able to see job completion status, log, and compiled result files in the DFS user output dir:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -ls /user/cyg_server/output

Found 3 items
-rw-rw-r--   1 cyg_server root          0 2012-05-05 20:39 /user/cyg_server/output/_SUCCESS
drwxrwxr-x   - cyg_server root          0 2012-05-05 20:38 /user/cyg_server/output/_logs
-rw-rw-r--   1 cyg_server root         44 2012-05-05 20:39 /user/cyg_server/output/part-r-00000

The part-r-xxxxx is the results file:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

./bin/hadoop dfs -cat /user/cyg_server/output/part-r-00000

Bye     1
GoodbyeBye      1
Hadoop  2
Hello   2
World   2

ZooKeeper Manager

Installing ZooKeeper

Unzip the zookpeer-3.4.4 source in the local public working directory \work\hadoop\zookeper-3.4.3.


<?properties type="windows.system.variables" ?>

ZOOKEEPER_HOME=\work\hadoop\zookeeper-3.4.3

Configuring ZooKeeper

zoo.cfg

Edit the Zookeeper configuration file "zoo.cfg" to regroup the service port to 50080,
and set the zkServer data directory to a dir spanned by our unified tmp directory.

zoo.cfg:


<?properties ?>

# Hadoop - ZooKeeper configuration

# zkServer port to which the zkClient clients connect
clientPort=50080

# directory where the snapshot is stored
dataDir=/cygwin/tmp/hadoop-cyg_server/zookeeper/data

# number of milliseconds of each tick
tickTime=2000

# number of ticks for initial synchronization
initLimit=10

# number of ticks between request and its acknowledgement
syncLimit=5

Running ZooKeeper

Start the zkServer ZooKeeper daemon:


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

../zookeeper-3.4.3/bin/zkServer.sh start

JMX enabled by default
Using config: C:\work\hadoop\zookeeper-3.4.3\conf\zoo.cfg
Starting zookeeper ... STARTED

Test that ZooKeeper by connecting a zkClient to the service.
Use the {CTRL}+C keys or enter the "quit" command to exit.


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

../zookeeper-3.4.3/bin/zkCli.sh -server localhost:50080

Connecting to localhost:50080
2012-05-17 12:40:40,495 [myid:] - INFO  [main:Environment@98] - Client environment:zookeeper.version=3.4.3-1240972, built on 02/06/2012 10:48 GMT
2012-05-17 12:40:40,502 [myid:] - INFO  [main:Environment@98] - Client environment:host.name=Fenris
2012-05-17 12:40:40,502 [myid:] - INFO  [main:Environment@98] - Client environment:java.version=1.7.0_03
2012-05-17 12:40:40,503 [myid:] - INFO  [main:Environment@98] - Client environment:java.vendor=Oracle Corporation
2012-05-17 12:40:40,503 [myid:] - INFO  [main:Environment@98] - Client environment:java.home=C:\work\java\jdk1.7.0_03_x64\jre
2012-05-17 12:40:40,504 [myid:] - INFO  [main:Environment@98] - Client environment:java.class.path=C:\work\hadoop\zookeeper-3.4.3\build\classes;C:\work\hadoop\zookeeper-3.4.3\build\lib\*.jar;C:\work\hadoop\zookeeper-3.4.3\lib\slf4j-log4j12-1.6.1.jar;C:\work\hadoop\zookeeper-3.4.3\lib\slf4j-api-1.6.1.jar;C:\work\hadoop\zookeeper-3.4.3\lib\netty-3.2.2.Final.jar;C:\work\hadoop\zookeeper-3.4.3\lib\log4j-1.2.15.jar;C:\work\hadoop\zookeeper-3.4.3\lib\jline-0.9.94.jar;C:\work\hadoop\zookeeper-3.4.3\zookeeper-3.4.3.jar;C:\work\hadoop\zookeeper-3.4.3\src\java\lib\*.jar;C:\work\hadoop\zookeeper-3.4.3\conf;.
2012-05-17 12:40:40,505 [myid:] - INFO  [main:Environment@98] - Client environment:java.library.path=C:\work\java\jdk1.7.0_03_x64\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:\cygwin\usr\local\bin;C:\cygwin\bin;C:\work\bin;C:\work\java\bin;C:\sysinternals;C:\work\java\jdk1.7.0_03_x64\bin;%CommonProgramFiles%\Microsoft Shared\Windows Live;C:\Windows\SYSTEM32;C:\Windows;C:\Windows\SYSTEM32\WBEM;C:\Windows\SYSTEM32\WINDOWSPOWERSHELL\V1.0;C:\PROGRAM FILES (X86)\ATI TECHNOLOGIES\ATI.ACE\CORE-STATIC;C:\PROGRAM FILES (X86)\COMMON FILES\ROXIO SHARED\10.0\DLLSHARED;C:\PROGRAM FILES (X86)\COMMON FILES\ROXIO SHARED\DLLSHARED;C:\PROGRAM FILES (X86)\MICROSOFT SQL SERVER\90\TOOLS\BINN;C:\Program Files\WIDCOMM\Bluetooth Software;C:\Program Files\WIDCOMM\Bluetooth Software\syswow64;C:\Program Files (x86)\QuickTime\QTSystem;C:\cygwin\bin;C:\cygwin\lib\lapack;.
2012-05-17 12:40:40,506 [myid:] - INFO  [main:Environment@98] - Client environment:java.io.tmpdir=C:\cygwin\tmp\
2012-05-17 12:40:40,506 [myid:] - INFO  [main:Environment@98] - Client environment:java.compiler=<NA>
2012-05-17 12:40:40,507 [myid:] - INFO  [main:Environment@98] - Client environment:os.name=Windows 7
2012-05-17 12:40:40,508 [myid:] - INFO  [main:Environment@98] - Client environment:os.arch=amd64
2012-05-17 12:40:40,508 [myid:] - INFO  [main:Environment@98] - Client environment:os.version=6.1
2012-05-17 12:40:40,509 [myid:] - INFO  [main:Environment@98] - Client environment:user.name=cyg_server
2012-05-17 12:40:40,509 [myid:] - INFO  [main:Environment@98] - Client environment:user.home=C:\Users\cyg_server
2012-05-17 12:40:40,510 [myid:] - INFO  [main:Environment@98] - Client environment:user.dir=C:\work\hadoop\hadoop-1.0.1
2012-05-17 12:40:40,512 [myid:] - INFO  [main:ZooKeeper@433] - Initiating client connection, connectString=localhost:50080 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@513864b2
Welcome to ZooKeeper!
2012-05-17 12:40:43,566 [myid:] - INFO  [main-SendThread():ClientCnxn$SendThread@933] - Opening socket connection to server /127.0.0.1:50080
2012-05-17 12:40:43,620 [myid:] - INFO  [main-SendThread(127.0.0.1:50080):ZooKeeperSaslClient@125] - Client will not SASL-authenticate because the default JAAS configuration section 'Client' could not be found. If you are not using SASL, you may ignore this. On the other hand, if you expected SASL to work, please fix your JAAS configuration.
JLine support is enabled
2012-05-17 12:40:43,624 [myid:] - INFO  [main-SendThread(127.0.0.1:50080):ClientCnxn$SendThread@846] - Socket connection established to 127.0.0.1/127.0.0.1:50080, initiating session
2012-05-17 12:40:43,707 [myid:] - INFO  [main-SendThread(127.0.0.1:50080):ClientCnxn$SendThread@1175] - Session establishment complete on server 127.0.0.1/127.0.0.1:50080, sessionid = 0x1375a620dbd0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:50080(CONNECTED) 0] quit

HBase Database

Installing HBase

Unzip the hbase-0.92.0 source in the local public working directory \work\hadoop\hbase-0.92.0.


<?properties type="windows.system.variables" ?>

HBASE_HOME=\work\hadoop\hbase-0.92.0

Fixing HBase scripts

hbase-env.sh

Start by configuring the hbase-env.sh source configuration script to set the correct JDK.
Also change the pid and log directories to point to locations in the unified tmp directories.

Finally disable HBase's internal Zookeeper to make it talk to the external ZooKeeper instead.

hbase-env.sh:


#!/usr/bin/env bash

# -- cygwin patch --

# The java implementation to use.  Required.
#export JAVA_HOME=/usr/local/java
export JAVA_HOME=/work/java/jdk1.7.0_03_x64

# String representing this instance of hbase. $USER by default.
# export HBASE_IDENT_STRING=$USER
export HBASE_IDENT_STRING=$HOSTNAME

# Whether HBase manages its own internal Zookeeper.  True by default.
# export HBASE_MANAGES_ZK=true
export HBASE_MANAGES_ZK=false

# File naming hosts on which HRegionServers will run.  $HBASE_HOME/conf/regionservers by default.
# export HBASE_REGIONSERVERS=${HBASE_HOME}/conf/regionservers

# Where log files are stored.  $HBASE_HOME/logs by default.
# export HBASE_LOG_DIR=${HBASE_HOME}/logs
export HBASE_LOG_DIR=/tmp/hadoop-cyg_server/hbase/logs

# The directory where pid files are stored. /tmp by default.
# export HBASE_PID_DIR=/tmp

# -- cygwin patch --

start-hbase.sh


#!/usr/bin/env bash

# -- cygwin patch --

pid=`ps aux | grep hbase | head -1 | awk '{print $1}'`
if  [ "$pid" != "" ]; then
  echo "hbase already running ($pid). run stop-hbase.sh to terminate."
  exit 1
fi

# log dir
rm -fr ${HBASE_LOG_DIR}/hbase-*

# tmp dir
mkdir -p ${HADOOP_TMP_DIR}/mapred/local/taskTracker
chmod a+s  ${HADOOP_TMP_DIR}/mapred/local/taskTracker

# pid dir
rm -fr ${HBASE_PID_DIR}/hbase-*.pid

# -- cygwin patch --

Configuring HBase

hbase-site.xml

hbase-site.xml:


<?xml version="1.0"?>
<configuration>

  <property>
    <name>hbase.rootdir</name>
    <value>file:///C:/cygwin/tmp/hadoop-${USER}/hbase/data</value>
    <description>distributed directory url shared by region servers.</description>
  </property>

  <property>
    <name>hbase.tmp.dir</name>
    <value>C:/cygwin/tmp/hadoop-${USER}/hbase/tmp</value>
    <description>local filesystem tmp dir.</description>
  </property>

</configuration>

Running HBase


cyg_server@Fenris /work/hadoop/hadoop-1.0.1 $

../hbase-0.92.0/bin/start-hbase.sh

starting master, logging to /tmp/hadoop-cyg_server/hbase/logs/hbase-cyg_server-master-Fenris.out
Enter passphrase for key '/home/cyg_server/.ssh/id_rsa':
localhost: starting regionserver, logging to /tmp/hadoop-cyg_server/hbase/logs/hbase-cyg_server-regionserver-Fenris.out

Links

hadoop consoles

Credits


generated from MediaWiki by net.muxBus.ant.MarkdownTranslator: (https://sourceforge.net/p/muxbus/)