From: <sta...@us...> - 2007-04-10 00:03:30
|
Revision: 1711 http://archive-access.svn.sourceforge.net/archive-access/?rev=1711&view=rev Author: stack-sf Date: 2007-04-09 17:03:29 -0700 (Mon, 09 Apr 2007) Log Message: ----------- * nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java Add in new 'multiple' parameter for running multiple concurrent non-mapreduce tasks such as merge and sort. * nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java Job to run multiple concurrent non-mapreduce tasks. * .classpath Update classpath so points to new locations for src. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/.classpath trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java Added Paths: ----------- trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java Modified: trunk/archive-access/projects/nutchwax/.classpath =================================================================== --- trunk/archive-access/projects/nutchwax/.classpath 2007-04-09 16:45:35 UTC (rev 1710) +++ trunk/archive-access/projects/nutchwax/.classpath 2007-04-10 00:03:29 UTC (rev 1711) @@ -1,14 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="src" path="src/java"/> - <classpathentry kind="src" path="src/plugin/index-wax/src/java"/> - <classpathentry kind="src" path="src/plugin/parse-default/src/java"/> - <classpathentry kind="src" path="src/plugin/parse-waxext/src/java"/> - <classpathentry kind="src" path="src/plugin/query-anchor/src/java"/> - <classpathentry kind="src" path="src/plugin/query-content/src/java"/> - <classpathentry kind="src" path="src/plugin/query-host/src/java"/> - <classpathentry kind="src" path="src/plugin/query-title/src/java"/> - <classpathentry kind="src" path="src/plugin/query-wax/src/java"/> + <classpathentry kind="src" path="nutchwax-core/src/main/java"/> <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/java"/> <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/plugin/index-basic/src/java"/> <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/plugin/index-more/src/java"/> @@ -45,7 +37,6 @@ <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/plugin/urlnormalizer-pass/src/test"/> <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/plugin/urlnormalizer-regex/src/java"/> <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/plugin/urlnormalizer-regex/src/test"/> - <classpathentry kind="src" path="nutchwax-thirdparty/nutch/src/test"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/clustering-carrot2/clustering-carrot2.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/creativecommons/creativecommons.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/index-basic/index-basic.jar"/> @@ -65,7 +56,6 @@ <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/lib-xml/saxpath.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/lib-xml/xercesImpl.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/microformats-reltag/microformats-reltag.jar"/> - <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/nutch-0.9-dev.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/nutch-extensionpoints/nutch-extensionpoints.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/ontology/ontology.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build/parse-ext/parse-ext.jar"/> @@ -179,7 +169,7 @@ <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-lang-2.1.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-logging-1.0.4.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-logging-api-1.0.4.jar"/> - <classpathentry sourcepath="/home/stack/checkouts/hadoop/src/java" kind="lib" path="nutchwax-thirdparty/nutch/lib/hadoop-0.12.2-core.jar"/> + <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/hadoop-0.12.2-core.jar" sourcepath="/home/stack/checkouts/hadoop/src/java"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jakarta-oro-2.0.7.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jets3t-0.5.0.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jetty-5.1.4.jar"/> Added: trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java =================================================================== --- trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java (rev 0) +++ trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java 2007-04-10 00:03:29 UTC (rev 1711) @@ -0,0 +1,219 @@ +package org.archive.access.nutch; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ToolBase; +import org.apache.nutch.util.NutchConfiguration; + +/** + * Run multiple concurrent tasks. + * Takes input that has per line the name of the class to run and the arguments + * to pass. Use this mapreduce job to run multiple concurrent merges or + * multiple concurrent sorts, etc. Will run as many tasks as there are input + * lines. + * @author stack + */ +public class Multiple extends ToolBase implements Mapper { + public void map(WritableComparable key, Writable value, + OutputCollector output, final Reporter reporter) + throws IOException { + final String [] words = ("PADDING_FOR_DOCLASS_BELOW " + + value.toString()).split("\\s"); + if (words.length <= 1) { + return; + } + // Set a timer running that will update reporter on a period. + Timer t = new Timer(false); + t.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + reporter.setStatus("Running " + words[1]); + } catch (IOException e) { + e.printStackTrace(); + } + }}, 0, 3000); + try { + Nutchwax.doClass(words); + } finally { + t.cancel(); + } + } + + public void configure(JobConf job) { + // Nothing to configure. + } + + public void close() throws IOException { + // TODO Auto-generated method stub + } + + public static class MultipleInputFormat implements InputFormat { + + public RecordReader getRecordReader(final InputSplit split, + final JobConf job, final Reporter reporter) + throws IOException { + // Only one record/line to read. + return new RecordReader() { + private final String line = ((LineInputSplit)split).line; + private boolean read = false; + + public void close() throws IOException { + // TODO Auto-generated method stub + } + + public WritableComparable createKey() { + return new Text(""); + } + + public Writable createValue() { + return new Text(""); + } + + public long getPos() throws IOException { + return 0; + } + + public float getProgress() throws IOException { + return getPos(); + } + + public boolean next(Writable key, Writable value) + throws IOException { + if (read) { + return false; + } + read = true; + ((Text)value).set(this.line); + return true; + } + }; + } + + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + Path[] inputs = job.getInputPaths(); + List<String> lines = new ArrayList<String>(); + for (int i = 0; i < inputs.length; i++) { + Path p = inputs[i]; + FileSystem fs = p.getFileSystem(job); + Path [] ps = fs.listPaths(p); + for (int j = 0; j < ps.length; j++) { + if (fs.isDirectory(ps[j])) { + continue; + } + addFileLines(lines, fs, ps[j]); + } + } + List<LineInputSplit> splits = + new ArrayList<LineInputSplit>(lines.size()); + for (String line: lines) { + splits.add(new LineInputSplit(line)); + } + return splits.toArray(new LineInputSplit [0]); + } + + private void addFileLines(final List<String> lines, final FileSystem fs, + final Path p) + throws IOException { + InputStream is = (InputStream)fs.open(p); + LineNumberReader lnr = null; + try { + lnr = new LineNumberReader(new InputStreamReader(is)); + for (String l = null; (l = lnr.readLine()) != null;) { + if (l.length() > 0 && !l.trim().startsWith("#")) { + lines.add(l); + } + } + } finally { + if (lnr != null) { + lnr.close(); + } + is.close(); + } + } + + public void validateInput(JobConf job) throws IOException { + // Nothing to validate. + } + } + + public static class LineInputSplit implements InputSplit { + private String line; + + protected LineInputSplit() { + super(); + } + + public LineInputSplit(final String l) { + line = l; + } + + public long getLength() throws IOException { + return line.length(); + } + + public String[] getLocations() throws IOException { + return new String[0]; + } + + public void readFields(DataInput in) throws IOException { + this.line = in.readLine(); + } + + public void write(DataOutput out) throws IOException { + out.writeBytes(this.line); + } + } + + public static void usage() { + System.out.println("Usage: multiple <input> <output>"); + System.out.println("Arguments:"); + System.out.println(" <input> Directory of input files with " + + "each line describing task to run"); + System.out.println(" <output> Output directory."); + } + + public int run(String[] args) throws Exception { + if (args.length != 2 || + (args.length == 1 && + (args[0].equals("-h") || args[0].equals("--help")))) { + usage(); + return -1; + } + JobConf job = new JobConf(MultipleInputFormat.class); + job.setInputFormat(MultipleInputFormat.class); + job.setInputPath(new Path(args[0])); + job.setMapperClass(Multiple.class); + job.setOutputPath(new Path(args[1])); + JobClient.runJob(job); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = new Multiple().doMain(NutchConfiguration.create(), args); + System.exit(res); + } +} \ No newline at end of file Modified: trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java =================================================================== --- trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java 2007-04-09 16:45:35 UTC (rev 1710) +++ trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java 2007-04-10 00:03:29 UTC (rev 1711) @@ -65,7 +65,7 @@ private final static List JOBS = Arrays.asList(new String[] { "import", "update", "invert", "index", "dedup", "merge", "all", - "class", "search"}); + "class", "search", "multiple"}); // Lazy initialize these two variables to delay complaint about hadoop not @@ -269,7 +269,7 @@ od.getIndex(), od.getTmpDir()); } - protected String [] rewriteArgs(final String [] args, final int offset) { + static String [] rewriteArgs(final String [] args, final int offset) { final String [] newArgs = new String[args.length - offset]; for (int i = 0; i < args.length; i++) { if (i < offset) { @@ -280,7 +280,7 @@ return newArgs; } - protected void doClass(final String [] args) { + static void doClass(final String [] args) { // Redo args so absent our nutchwax 'class' command. final String className = args[1]; String [] newArgs = rewriteArgs(args, 2); @@ -331,6 +331,10 @@ } } + protected void doMultiple(final String [] args) throws Exception { + (new Multiple()).run(rewriteArgs(args, 1)); + } + protected void doJob(final String jobName, final String [] args) throws Exception { if (jobName.equals("import")) { @@ -431,6 +435,8 @@ doClassUsage("ERROR: Wrong number of arguments passed.", 2); } doSearch(args); + } else if (jobName.equals("multiple")) { + doMultiple(args); } else { usage("ERROR: No handler for job name " + jobName, 4); System.exit(0); @@ -534,17 +540,17 @@ System.out.println("Jobs (usually) must be run in the order " + "listed below."); System.out.println("Available jobs:"); - System.out.println(" import Import ARCs."); - System.out.println(" update Update dbs with recent imports."); - System.out.println(" invert Invert links."); - System.out.println(" index Index segments."); - System.out.println(" dedup Deduplicate by URL or content MD5."); - System.out.println(" merge Merge segment indices into one."); - System.out.println(" all Runs all above jobs in order."); - System.out.println(" class Run the passed class's main."); - System.out.println(" search Run a query against index under " + + System.out.println(" import Import ARCs."); + System.out.println(" update Update dbs with recent imports."); + System.out.println(" invert Invert links."); + System.out.println(" index Index segments."); + System.out.println(" dedup Deduplicate by URL or content MD5."); + System.out.println(" merge Merge segment indices into one."); + System.out.println(" all Runs all above jobs in order."); + System.out.println(" class Run the passed class's main."); + System.out.println(" search Run a query against index under " + "property 'searcher.dir'"); - + System.out.println(" multiple Run multiple concurrent tasks."); System.exit(exitCode); } @@ -621,6 +627,15 @@ " to merge reside."); System.exit(exitCode); } + + public static void doMultipleUsage(final String message, + final int exitCode) { + if (message != null && message.length() > 0) { + System.out.println(message); + } + Multiple.usage(); + System.exit(exitCode); + } public static void doSearchUsage(final String message, final int exitCode) { @@ -704,6 +719,8 @@ doAllUsage(null, 1); } else if (jobName.equals("search")) { doSearchUsage(null, 1); + } else if (jobName.equals("multiple")) { + doMultipleUsage(null, 1); } else if (jobName.equals("class")) { doClassUsage(null, 1); } else { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |