From: <sta...@us...> - 2007-04-03 17:23:41
|
Revision: 1686 http://archive-access.svn.sourceforge.net/archive-access/?rev=1686&view=rev Author: stack-sf Date: 2007-04-03 10:23:23 -0700 (Tue, 03 Apr 2007) Log Message: ----------- Added new module, wayback-mapreduce-prereq. It builds mapreduce code. Its not possible to do code build and assemble in a single module (Mailing list hints it might be possible when the assembly plugin moves to 2.2). * wayback-mapreduce/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java * wayback-mapreduce/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java * wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java * wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java Added driver and partitioner here from wayback-mapreduce. * wayback-mapreduce/src/main/assembly/mapreduce-job.xml Exclude unwanted dependencies (hadoop, etc.) * wayback-mapreduce-prereq/pom.xml Added. * wayback-mapreduce/pom.xml Make this module just be about assembly. * pom.xml Add mention of new module, wayback-mapreduce-prereq Modified Paths: -------------- trunk/archive-access/projects/wayback/pom.xml trunk/archive-access/projects/wayback/wayback-mapreduce/pom.xml trunk/archive-access/projects/wayback/wayback-mapreduce/src/main/assembly/mapreduce-job.xml Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/pom.xml trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/assembly/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java Removed Paths: ------------- trunk/archive-access/projects/wayback/wayback-mapreduce/src/main/java/ Modified: trunk/archive-access/projects/wayback/pom.xml =================================================================== --- trunk/archive-access/projects/wayback/pom.xml 2007-04-03 02:44:39 UTC (rev 1685) +++ trunk/archive-access/projects/wayback/pom.xml 2007-04-03 17:23:23 UTC (rev 1686) @@ -116,6 +116,11 @@ </dependency> <dependency> <groupId>org.archive.wayback</groupId> + <artifactId>wayback-mapreduce-prereq</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.archive.wayback</groupId> <artifactId>wayback-webapp</artifactId> <version>${project.version}</version> </dependency> @@ -181,7 +186,6 @@ </plugin> </plugins> </reporting> - <repositories> <repository> <releases> @@ -218,8 +222,9 @@ <modules> <module>wayback-core</module> + <module>wayback-mapreduce-prereq</module> + <module>wayback-mapreduce</module> <module>wayback-webapp</module> - <module>wayback-mapreduce</module> </modules> </project> Modified: trunk/archive-access/projects/wayback/wayback-mapreduce/pom.xml =================================================================== --- trunk/archive-access/projects/wayback/wayback-mapreduce/pom.xml 2007-04-03 02:44:39 UTC (rev 1685) +++ trunk/archive-access/projects/wayback/wayback-mapreduce/pom.xml 2007-04-03 17:23:23 UTC (rev 1686) @@ -1,16 +1,11 @@ <?xml version="1.0"?> <!-- - POM reference: http://maven.apache.org/pom.html - - List of the better articles on maven: - - http://www.javaworld.com/javaworld/jw-05-2006/jw-0529-maven.html - http://www.javaworld.com/javaworld/jw-02-2006/jw-0227-maven_p.html - - URLs on converting from 1.0 to 2.0 maven (not much good generally): - - http://wiki.osafoundation.org/bin/view/Journal/Maven2Upgrade - http://maven.apache.org/guides/mini/guide-m1-m2.html + Assembly the mapreduce job jar. + Assembly is done as attachment to pakaging + step using the maven-assembly-plugin. The + packaging for this module is pom, not jar + (You cannot have packaging and the + assembly product be same). --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -19,13 +14,10 @@ <artifactId>wayback</artifactId> <version>0.9.0-SNAPSHOT</version> </parent> - <!--You cannot tell it not build the jar so name it 'placeholder'. - Actual job jar is made using below assembly spec. at src/main/assembly - --> <groupId>org.archive.wayback</groupId> <artifactId>wayback-mapreduce</artifactId> <packaging>pom</packaging> - <name>Wayback MapReduce Indexing Job Jar</name> + <name>Wayback MapReduce Job Jar Assembly</name> <dependencies> <dependency> @@ -33,10 +25,8 @@ <artifactId>wayback-core</artifactId> </dependency> <dependency> - <groupId>org.apache</groupId> - <artifactId>hadoop</artifactId> - <version>0.12.2-core</version> - <scope>compile</scope> + <groupId>org.archive.wayback</groupId> + <artifactId>wayback-mapreduce-prereq</artifactId> </dependency> <dependency> <groupId>org.archive</groupId> @@ -69,8 +59,7 @@ </appendAssemblyId> <archive> <manifest> - <!--Pointer to driver for wayback indexing--> - <mainClass>org.archive.wayback.Wayback</mainClass> + <mainClass>org.archive.wayback.resourceindex.indexer.hadoop.Driver</mainClass> </manifest> </archive> </configuration> @@ -83,7 +72,6 @@ </execution> </executions> </plugin> - </plugins> </build> </project> Modified: trunk/archive-access/projects/wayback/wayback-mapreduce/src/main/assembly/mapreduce-job.xml =================================================================== --- trunk/archive-access/projects/wayback/wayback-mapreduce/src/main/assembly/mapreduce-job.xml 2007-04-03 02:44:39 UTC (rev 1685) +++ trunk/archive-access/projects/wayback/wayback-mapreduce/src/main/assembly/mapreduce-job.xml 2007-04-03 17:23:23 UTC (rev 1686) @@ -6,14 +6,28 @@ <includeBaseDirectory>false</includeBaseDirectory> <fileSets> <fileSet> - <directory>target/classes</directory> + <directory>../wayback-mapreduce-prereq/target/classes</directory> <outputDirectory>/</outputDirectory> </fileSet> </fileSets> <dependencySets> <dependencySet> <outputDirectory>/lib</outputDirectory> - <scope>runtime</scope> + <excludes> + <exclude>org.apache:hadoop</exclude> + <exclude>junit:junit</exclude> + <exclude>javax.servlet:servlet-api</exclude> + <!--Do not include. Its content has already been added above in + fileset as classes to top level of the jar.--> + <exclude>org.archive.wayback:wayback-mapreduce-prereq</exclude> + + <!--These are probably not needed either--> + <exclude>commons-cli:commons-cli</exclude> + <exclude>commons-collections:commons-collections</exclude> + <exclude>commons-pool:commons-pool</exclude> + <exclude>commons-logging:commons-logging</exclude> + + </excludes> </dependencySet> </dependencySets> </assembly> Added: trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/pom.xml =================================================================== --- trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/pom.xml (rev 0) +++ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/pom.xml 2007-04-03 17:23:23 UTC (rev 1686) @@ -0,0 +1,50 @@ +<?xml version="1.0"?> +<!-- + Build the mapreduce code in this module and then assemble the job jar + in the subsequent module, wayback-mapreduce-assembly. I cannot do + them both in the same module. It might be possible when + maven-assembly-plugin is released. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.archive</groupId> + <artifactId>wayback</artifactId> + <version>0.9.0-SNAPSHOT</version> + </parent> + <groupId>org.archive.wayback</groupId> + <artifactId>wayback-mapreduce-prereq</artifactId> + <packaging>jar</packaging> + <name>Wayback MapReduce Indexing</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.5</source> + <target>1.5</target> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.archive.wayback</groupId> + <artifactId>wayback-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache</groupId> + <artifactId>hadoop</artifactId> + <version>0.12.2-core</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.archive</groupId> + <artifactId>archive-mapred</artifactId> + <version>0.2.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> Added: trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/AlphaPartitioner.java 2007-04-03 17:23:23 UTC (rev 1686) @@ -0,0 +1,108 @@ +/* AlphaPartitioner + * + * $Id$ + * + * Created on 6:08:33 PM Mar 29, 2007. + * + * Copyright (C) 2007 Internet Archive. + * + * This file is part of wayback. + * + * wayback is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * any later version. + * + * wayback is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License + * along with wayback-svn; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.wayback.resourceindex.indexer.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + +/** + * + * + * @author brad + * @version $Date$, $Revision$ + */ +public class AlphaPartitioner implements Partitioner { + String boundaries[] = new String[0]; + + public void configure(JobConf job) { + String pathStr = job.get("alpha.splitfile", "/tmp/split.txt"); + System.err.println("Using split " + pathStr); + Path p = new Path(pathStr); + FileSystem fs; + try { + fs = FileSystem.get(new URI(pathStr), job); + FSDataInputStream in = fs.open(p); + InputStreamReader is = new InputStreamReader(in); + BufferedReader bis = new BufferedReader(is); + ArrayList<String> l = new ArrayList<String>(); + while (true) { + String line = bis.readLine(); + if (line == null) { + break; + } + l.add(line); + } + boundaries = l.toArray(boundaries); + Arrays.sort(boundaries); + System.err.println("Loaded and Sorted split " + pathStr); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + /** + * @return the number of partitions in the configuration file. This is also + * the number of reduce tasks in the job. + */ + public int getNumPartitions() { + return boundaries.length; + } + + /** Use {@link Object#hashCode()} to partition. + * @param key + * @param value + * @param numReduceTasks + * @return int partition index for key*/ + public int getPartition(WritableComparable key, Writable value, + int numReduceTasks) { + Text t = (Text) key; + String keyS = t.toString(); + int loc = Arrays.binarySearch(boundaries, keyS); + if (loc < 0) { + loc = (loc * -1) - 2; + if (loc < 0) { + loc = 0; + } + } + return loc; + } + +} Added: trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-mapreduce-prereq/src/main/java/org/archive/wayback/resourceindex/indexer/hadoop/Driver.java 2007-04-03 17:23:23 UTC (rev 1686) @@ -0,0 +1,154 @@ +package org.archive.wayback.resourceindex.indexer.hadoop; + + +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.ReflectionUtils; +import org.archive.io.arc.ARCRecord; +import org.archive.mapred.ARCMapRunner; +import org.archive.wayback.resourceindex.indexer.ArcIndexer; + +/** + * Hadoop Driver for generation of alphabetically partitioned Wayback CDX + * files using the Hadoop framework. + * + * @author brad + * @version $Date$, $Revision$ + */ +public class Driver { + + + /** + * Mapper which converts an ARCRecord into a CDX line. + * + * @author brad + * @version $Date$, $Revision$ + */ + public static class MapClass extends MapReduceBase implements Mapper { + + private Text outKey = new Text(); + private Text outValue = new Text(""); + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) throws IOException { + ObjectWritable ow = (ObjectWritable) value; + ARCRecord rec = (ARCRecord) ow.get(); + String line; + try { + line = ArcIndexer.arcRecordToCDXLine(rec); + + outKey.set(line); + output.collect(outKey, outValue); + } catch (ParseException e) { + e.printStackTrace(); + } + } + } + + static void printUsage() { + System.out.println("[-m <maps>] <input> <output>"); + System.exit(1); + } + + /** + * The main driver for sort program. + * Invoke this method to submit the map/reduce job. + * @param args + * @throws IOException When there is communication problems with the + * job tracker. + */ + public static void main(String[] args) throws IOException { + Configuration defaults = new Configuration(); + + JobConf jobConf = new JobConf(defaults, Driver.class); + jobConf.setJobName("cdx1"); + + jobConf.setMapRunnerClass(ARCMapRunner.class); + +// jobConf.setInputFormat(SequenceFileInputFormat.class); + jobConf.setOutputFormat(TextOutputFormat.class); + + jobConf.setOutputKeyClass(Text.class); + jobConf.setOutputValueClass(Text.class); + jobConf.set("mapred.partitioner.class", + "org.archive.wayback.resourceindex.indexer.hadoop.AlphaPartitioner"); + + jobConf.setMapperClass(MapClass.class); +// jobConf.setMapperClass(IdentityMapper.class); + jobConf.setReducerClass(IdentityReducer.class); + + AlphaPartitioner part = (AlphaPartitioner)ReflectionUtils.newInstance( + jobConf.getPartitionerClass(), jobConf); + int num_reduces = part.getNumPartitions(); + + + JobClient client = new JobClient(jobConf); + ClusterStatus cluster = client.getClusterStatus(); + int num_maps = cluster.getTaskTrackers() + * jobConf.getInt("test.sort.maps_per_host", 10); + List <String>otherArgs = new ArrayList<String>(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-m".equals(args[i])) { + num_maps = Integer.parseInt(args[++i]); + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + + args[i]); + printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + printUsage(); // exits + } + } + + jobConf.setNumMapTasks(num_maps); + jobConf.setNumReduceTasks(num_reduces); + + // Make sure there are exactly 2 parameters left. + if (otherArgs.size() != 2) { + System.out.println("ERROR: Wrong number of parameters: " + + otherArgs.size() + " instead of 2."); + printUsage(); + } + jobConf.setInputPath(new Path((String) otherArgs.get(0))); + jobConf.setOutputPath(new Path((String) otherArgs.get(1))); + + // Uncomment to run locally in a single process + //job_conf.set("mapred.job.tracker", "local"); + + System.out.println("Running on " + cluster.getTaskTrackers() + + " nodes to sort from " + jobConf.getInputPaths()[0] + + " into " + jobConf.getOutputPath() + " with " + num_reduces + + " reduces."); + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + JobClient.runJob(jobConf); + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |