From: <sta...@us...> - 2007-04-10 17:08:00
|
Revision: 1712 http://archive-access.svn.sourceforge.net/archive-access/?rev=1712&view=rev Author: stack-sf Date: 2007-04-10 10:07:52 -0700 (Tue, 10 Apr 2007) Log Message: ----------- Implement '[ 1697808 ] [nutchwax] Use MR to run multiple concurrent index merges' * xdocs/faq.fml Add note on how to run many concurrent merges. * nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java (doClass): Rename as doClassMain since thats what it actually does. Roll all exceptions up into the generic Exception capture since all are given the same treatment anyways. (doSearch): Refactor to use doClassMain. * nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java Cleanup that comes of a bunch of exercising of this class. Added logger. Narrow what this class does; now it will only run the doMain of hadoop ToolBase classes. Set number of map tasks to be the number of splits. * .classpath Remove /home/stack referecne. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/.classpath trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java trunk/archive-access/projects/nutchwax/xdocs/faq.fml Modified: trunk/archive-access/projects/nutchwax/.classpath =================================================================== --- trunk/archive-access/projects/nutchwax/.classpath 2007-04-10 00:03:29 UTC (rev 1711) +++ trunk/archive-access/projects/nutchwax/.classpath 2007-04-10 17:07:52 UTC (rev 1712) @@ -169,7 +169,6 @@ <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-lang-2.1.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-logging-1.0.4.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/commons-logging-api-1.0.4.jar"/> - <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/hadoop-0.12.2-core.jar" sourcepath="/home/stack/checkouts/hadoop/src/java"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jakarta-oro-2.0.7.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jets3t-0.5.0.jar"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/jetty-5.1.4.jar"/> @@ -224,5 +223,6 @@ <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/build"/> <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/conf"/> <classpathentry kind="lib" path="nutchwax-plugins/target"/> + <classpathentry kind="lib" path="nutchwax-thirdparty/nutch/lib/hadoop-0.12.2-core.jar"/> <classpathentry kind="output" path="target"/> </classpath> Modified: trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java =================================================================== --- trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java 2007-04-10 00:03:29 UTC (rev 1711) +++ trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Multiple.java 2007-04-10 17:07:52 UTC (rev 1712) @@ -11,6 +11,8 @@ import java.util.Timer; import java.util.TimerTask; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -28,42 +30,74 @@ import org.apache.nutch.util.NutchConfiguration; /** - * Run multiple concurrent tasks. + * Run multiple concurrent non-mapreduce {@link ToolBase} tasks such as + * {@link org.apache.nutch.indexer.IndexMerger} or + * {@link org.apache.indexer.IndexSorter}. + * * Takes input that has per line the name of the class to run and the arguments - * to pass. Use this mapreduce job to run multiple concurrent merges or - * multiple concurrent sorts, etc. Will run as many tasks as there are input - * lines. + * to pass. Here is an example line for IndexMerger: + * <code>org.apache.nutch.indexer.IndexMerger -workingdir /tmp index-new indexes + * </code>. We run as many tasks as there are input lines. + * * @author stack */ public class Multiple extends ToolBase implements Mapper { + public final Log LOG = LogFactory.getLog(this.getClass()); + private JobConf job; + public void map(WritableComparable key, Writable value, OutputCollector output, final Reporter reporter) throws IOException { - final String [] words = ("PADDING_FOR_DOCLASS_BELOW " + - value.toString()).split("\\s"); - if (words.length <= 1) { + final String [] words = value.toString().split("\\s"); + if (words.length <= 0) { return; } + final String className = words[0]; // Set a timer running that will update reporter on a period. Timer t = new Timer(false); t.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { - reporter.setStatus("Running " + words[1]); + reporter.setStatus("Running " + className); } catch (IOException e) { e.printStackTrace(); } - }}, 0, 3000); + }}, 0, 10000); try { - Nutchwax.doClass(words); - } finally { + int result = doMain(words); + reporter.setStatus("Done running " + className + ": " + result); + if (result != 0) { + throw new IOException(className + " returned non-null: " + + result + ", check logs."); + } + } finally { t.cancel(); } } - public void configure(JobConf job) { - // Nothing to configure. + /** + * Call {@link ToolBase#doMain(org.apache.hadoop.conf.Configuration, String[])} + * on the passed classname. + * @param args + * @return Result from call to doMain. + */ + private int doMain(final String [] args) { + final String className = args[0]; + // Redo args so absent our 'class' command. + String [] newArgs = Nutchwax.rewriteArgs(args, 1); + int result = -1; + try { + Object obj = Class.forName(className).newInstance(); + result = ((ToolBase)obj).doMain(this.job, newArgs); + } catch (Exception e) { + LOG.error(className, e); + } + return result; + } + + public void configure(final JobConf j) { + this.job = j; } public void close() throws IOException { @@ -132,6 +166,7 @@ for (String line: lines) { splits.add(new LineInputSplit(line)); } + job.setNumMapTasks(lines.size()); return splits.toArray(new LineInputSplit [0]); } Modified: trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java =================================================================== --- trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java 2007-04-10 00:03:29 UTC (rev 1711) +++ trunk/archive-access/projects/nutchwax/nutchwax-core/src/main/java/org/archive/access/nutch/Nutchwax.java 2007-04-10 17:07:52 UTC (rev 1712) @@ -26,7 +26,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; @@ -280,55 +279,32 @@ return newArgs; } - static void doClass(final String [] args) { + static Object doClassMain(final String [] args) { // Redo args so absent our nutchwax 'class' command. final String className = args[1]; String [] newArgs = rewriteArgs(args, 2); // From http://www.javaworld.com/javaworld/javaqa/1999-06/01-outside.html Class [] argTypes = new Class[1]; argTypes[0] = String[].class; + Object result = null; try { Method mainMethod = Class.forName(className).getDeclaredMethod("main", argTypes); - mainMethod.invoke(newArgs, new Object [] {newArgs}); - } catch (SecurityException e) { - throw new RuntimeException(e); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); + result = mainMethod.invoke(newArgs, new Object [] {newArgs}); + } catch (Throwable t) { + t.printStackTrace(); } + return result; } - protected void doSearch(final String [] args) { - // Redo args so absent our nutchwax 'query' command. - String [] newArgs = rewriteArgs(args, 1); - // From http://www.javaworld.com/javaworld/javaqa/1999-06/01-outside.html - Class [] argTypes = new Class[1]; - argTypes[0] = String[].class; - try { - Method mainMethod = Class.forName(NutchwaxBean.class.getName()). - getDeclaredMethod("main", argTypes); - mainMethod.invoke(newArgs, new Object [] {newArgs}); - } catch (SecurityException e) { - throw new RuntimeException(e); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); + protected Object doSearch(final String [] args) { + String [] newArgs = new String[args.length + 1]; + newArgs[0] = args[0]; + newArgs[1] = NutchwaxBean.class.getName(); + for (int i = 1; i < args.length; i++) { + newArgs[i + 1] = args[i]; } + return doClassMain(newArgs); } protected void doMultiple(final String [] args) throws Exception { @@ -429,7 +405,7 @@ if (args.length < 2) { doClassUsage("ERROR: Wrong number of arguments passed.", 2); } - doClass(args); + doClassMain(args); } else if (jobName.equals("search")) { if (args.length < 1) { doClassUsage("ERROR: Wrong number of arguments passed.", 2); Modified: trunk/archive-access/projects/nutchwax/xdocs/faq.fml =================================================================== --- trunk/archive-access/projects/nutchwax/xdocs/faq.fml 2007-04-10 00:03:29 UTC (rev 1711) +++ trunk/archive-access/projects/nutchwax/xdocs/faq.fml 2007-04-10 17:07:52 UTC (rev 1712) @@ -105,6 +105,19 @@ Run the following to see the usage: <pre>$ ${HADOOP_HOME}/bin/hadoop jar nutchwax-job-0.11.0-SNAPSHOT.jar class org.apache.nutch.segment.SegmentMerger ~/tmp/crawl/segments_merged/ ~/tmp/crawl/segments/20070406155807-test/ ~/tmp/crawl/segments/20070406155856-test/</pre> </p> +<p>If creating multiple indices, you may want to make use of the NutchWAX facility +that runs a mapreduce job to farm out the multiple index merges across the cluster +so they run conccurrently rather than in series. For usage, run the following: +<pre>stack@debord:~/workspace$ ${HADOOP_HOME}/bin/hadoop jar nutchwax.jar help multiple +</pre> +It takes an inputs directory and an outputs (The latter is usually not used). The +inputs lists per line a job to run on a remote machine. Here is an example line from +an input that would run an index merge of the directory <code>indexes-monday</code> into +<code>index-monday</index> using <code>/tmp</code> as working directory: +<pre> +org.apache.nutch.indexer.IndexMerger -workingdir /tmp index-monday indexes-monday +</pre>. +</p> </answer> </faq> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |