From: <sta...@us...> - 2007-02-27 18:01:32
|
Revision: 1521 http://archive-access.svn.sourceforge.net/archive-access/?rev=1521&view=rev Author: stack-sf Date: 2007-02-27 10:01:29 -0800 (Tue, 27 Feb 2007) Log Message: ----------- Add dependency on new archive-mapred jar. * src/java/org/archive/access/nutch/ImportArcs.java Package for mapreduce classes changes when we add dependency on archive-mapred jar. * src/java/org/archive/access/nutch/mapred/ARCReporter.java * src/java/org/archive/access/nutch/mapred/ARCRecordMapper.java * src/java/org/archive/access/nutch/mapred/ARCMapRunner.java Removed. Import from archive-mapred jar instead. * .classpath Update with new archive-mapred. Change path to nutch jar. * project.xml * project.properties Update with new archive-mapred * lib/archive-mapred-0.1.0-20070227.175246-2.jar Added. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/.classpath trunk/archive-access/projects/nutchwax/project.properties trunk/archive-access/projects/nutchwax/project.xml trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/ImportArcs.java Added Paths: ----------- trunk/archive-access/projects/nutchwax/lib/archive-mapred-0.1.0-20070227.175246-2.jar Removed Paths: ------------- trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCMapRunner.java trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCRecordMapper.java trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCReporter.java Modified: trunk/archive-access/projects/nutchwax/.classpath =================================================================== --- trunk/archive-access/projects/nutchwax/.classpath 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/.classpath 2007-02-27 18:01:29 UTC (rev 1521) @@ -14,8 +14,9 @@ <classpathentry kind="lib" path="/nutch/lib/commons-logging-1.0.4.jar"/> <classpathentry kind="lib" path="/nutch/lib/junit-3.8.1.jar"/> <classpathentry kind="lib" path="/nutch/conf"/> - <classpathentry kind="lib" path="lib/wayback-0.9.0-200702150450.jar" /> - <classpathentry kind="lib" path="/nutch/build"/> - <classpathentry kind="lib" path="build"/> + <classpathentry kind="lib" path="lib/wayback-0.9.0-200702150450.jar"/> + <classpathentry kind="lib" path="third-party/nutch/build"/> + <classpathentry combineaccessrules="false" kind="src" path="/hadoop"/> + <classpathentry kind="lib" path="lib/archive-mapred-0.1.0-20070227.175246-2.jar"/> <classpathentry kind="output" path="target"/> </classpath> Added: trunk/archive-access/projects/nutchwax/lib/archive-mapred-0.1.0-20070227.175246-2.jar =================================================================== (Binary files differ) Property changes on: trunk/archive-access/projects/nutchwax/lib/archive-mapred-0.1.0-20070227.175246-2.jar ___________________________________________________________________ Name: svn:mime-type + application/octet-stream Modified: trunk/archive-access/projects/nutchwax/project.properties =================================================================== --- trunk/archive-access/projects/nutchwax/project.properties 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/project.properties 2007-02-27 18:01:29 UTC (rev 1521) @@ -20,6 +20,7 @@ maven.jar.corenutch = ${basedir}/third-party/nutch/build/nutch-0.9-dev.jar maven.jar.hadoop = ${basedir}/third-party/nutch/lib/hadoop-0.10.1-core.jar maven.jar.archive-commons = ${basedir}/lib/archive-commons-1.11.0-200702160009.jar +maven.jar.archive-mapred = ${basedir}/lib/archive-mapred-0.1.0-20070227.175246-2.jar maven.jar.wayback = ${basedir}/lib/wayback-0.9.0-200702150450.jar maven.jar.servlet-api = ${basedir}/third-party/nutch/lib/servlet-api.jar maven.jar.commons-codec = ${basedir}/lib/commons-codec-1.3.jar Modified: trunk/archive-access/projects/nutchwax/project.xml =================================================================== --- trunk/archive-access/projects/nutchwax/project.xml 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/project.xml 2007-02-27 18:01:29 UTC (rev 1521) @@ -273,6 +273,17 @@ </properties> </dependency> <dependency> + <id>archive-mapred</id> + <version>0.1.0-SNAPSHOT</version> + <url>http://archive-access.sf.net/projects/mapred/</url> + <properties> + <war.bundle>true</war.bundle> + <description>Archive mapreduce classes. + </description> + <license>LGPL</license> + </properties> + </dependency> + <dependency> <id>wayback</id> <version>0.9.0</version> <url>http://builds.archive.org:8080/cruisecontrol/buildresults/HEAD-archive-access</url> Modified: trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/ImportArcs.java =================================================================== --- trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/ImportArcs.java 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/ImportArcs.java 2007-02-27 18:01:29 UTC (rev 1521) @@ -82,11 +82,11 @@ import org.apache.nutch.util.mime.MimeType; import org.apache.nutch.util.mime.MimeTypeException; import org.apache.nutch.util.mime.MimeTypes; -import org.archive.access.nutch.mapred.ARCMapRunner; -import org.archive.access.nutch.mapred.ARCRecordMapper; -import org.archive.access.nutch.mapred.ARCReporter; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; +import org.archive.mapred.ARCMapRunner; +import org.archive.mapred.ARCRecordMapper; +import org.archive.mapred.ARCReporter; import org.archive.util.Base32; import org.archive.util.MimetypeUtils; import org.archive.util.TextUtils; Deleted: trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCMapRunner.java =================================================================== --- trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCMapRunner.java 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCMapRunner.java 2007-02-27 18:01:29 UTC (rev 1521) @@ -1,263 +0,0 @@ -/* - * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ - * - * Copyright (C) 2007 Internet Archive. - * - * This file is part of the archive-access tools project - * (http://sourceforge.net/projects/archive-access). - * - * The archive-access tools are free software; you can redistribute them and/or - * modify them under the terms of the GNU Lesser Public License as published by - * the Free Software Foundation; either version 2.1 of the License, or any - * later version. - * - * The archive-access tools are distributed in the hope that they will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser - * Public License for more details. - * - * You should have received a copy of the GNU Lesser Public License along with - * the archive-access tools; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -package org.archive.access.nutch.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapRunnable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; -import org.archive.io.ArchiveReader; -import org.archive.io.ArchiveReaderFactory; -import org.archive.io.arc.ARCConstants; -import org.archive.io.arc.ARCRecord; - -/** - * MapRunner that passes an ARCRecord to configured mapper. - * Configured mapper must be implementation of {@link ARCMapRunner}. - * @author stack - */ -public class ARCMapRunner implements MapRunnable { - public final Log LOG = LogFactory.getLog(this.getClass().getName()); - private ARCRecordMapper mapper; - - /** - * How long to spend indexing. - */ - private long maxtime; - - - public void configure(JobConf job) { - this.mapper = (ARCRecordMapper)ReflectionUtils. - newInstance(job.getMapperClass(), job); - // Value is in minutes. - this.maxtime = job.getLong("wax.index.timeout", 60) * 60 * 1000; - } - - public void run(RecordReader input, OutputCollector output, - Reporter reporter) - throws IOException { - try { - WritableComparable key = input.createKey(); // Unused. - Writable value = input.createValue(); - while (input.next(key, value)) { - doArc(value.toString(), output, new ARCReporter(reporter)); - } - } finally { - this.mapper.close(); - } - } - - protected void doArc(final String arcurl, final OutputCollector output, - final ARCReporter reporter) - throws IOException { - if ((arcurl == null) || arcurl.endsWith("work")) { - reporter.setStatus("skipping " + arcurl, true); - return; - } - - // Set off indexing in a thread so I can cover it with a timer. - final Thread t = new IndexingThread(arcurl, output, reporter); - t.setDaemon(true); - t.start(); - final long start = System.currentTimeMillis(); - try { - for (long period = this.maxtime; t.isAlive() && (period > 0); - period = this.maxtime - (System.currentTimeMillis() - start)) { - try { - t.join(period); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - } - } finally { - cleanup(t, reporter); - } - } - - protected void cleanup(final Thread t, final ARCReporter reporter) - throws IOException { - if (!t.isAlive()) { - return; - } - reporter.setStatus("Killing indexing thread " + t.getName(), true); - t.interrupt(); - try { - // Give it some time to die. - t.join(1000); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - if (t.isAlive()) { - LOG.info(t.getName() + " will not die"); - } - } - - private class IndexingThread extends Thread { - private final String arcLocation; - private final OutputCollector output; - private final ARCReporter reporter; - - public IndexingThread(final String arcloc, final OutputCollector o, - final ARCReporter r) { - // Name this thread same as ARC location. - super(arcloc); - this.arcLocation = arcloc; - this.output = o; - this.reporter = r; - } - - /** - * @return Null if fails download. - */ - protected ArchiveReader getArchiveReader() { - ArchiveReader arc = null; - // Need a thread that will keep updating TaskTracker during long - // downloads else tasktracker will kill us. - Thread reportingDuringDownload = null; - try { - this.reporter.setStatus("opening " + this.arcLocation, true); - reportingDuringDownload = new Thread("reportDuringDownload") { - public void run() { - while (!this.isInterrupted()) { - try { - synchronized (this) { - sleep(1000 * 60); // Sleep a minute. - } - reporter.setStatus("downloading " + - arcLocation); - } catch (final IOException e) { - e.printStackTrace(); - // No point hanging around if we're failing - // status. - break; - } catch (final InterruptedException e) { - // Interrupt flag is cleared. Just fall out. - break; - } - } - } - }; - reportingDuringDownload.setDaemon(true); - reportingDuringDownload.start(); - arc = ArchiveReaderFactory.get(this.arcLocation); - } catch (final Throwable e) { - try { - final String msg = "Error opening " + this.arcLocation - + ": " + e.toString(); - this.reporter.setStatus(msg, true); - LOG.info(msg); - } catch (final IOException ioe) { - LOG.warn(this.arcLocation, ioe); - } - } finally { - if ((reportingDuringDownload != null) - && reportingDuringDownload.isAlive()) { - reportingDuringDownload.interrupt(); - } - } - return arc; - } - - public void run() { - if (this.arcLocation == null || this.arcLocation.length() <= 0) { - return; - } - ArchiveReader arc = getArchiveReader(); - if (arc == null) { - return; - } - - try { - ARCMapRunner.this.mapper.onARCOpen(); - - // Iterate over each ARCRecord. - for (final Iterator i = arc.iterator(); - i.hasNext() && !currentThread().isInterrupted();) { - final ARCRecord rec = (ARCRecord)i.next(); - - - try { - ARCMapRunner.this.mapper.map( - new Text(rec.getMetaData().getUrl()), - new ObjectWritable(rec), this.output, - this.reporter); - - final long b = rec.getMetaData().getContentBegin(); - final long l = rec.getMetaData().getLength(); - final long recordLength = (l > b)? (l - b): l; - if (recordLength > - ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE) { - // Now, if the content length is larger than a - // standard ARC, then it is most likely the last - // record in the ARC because ARC is closed after we - // exceed 100MB (DEFAULT_MAX_ARC...). Calling - // hasNext above will make us read through the - // whole record, even if its a 1.7G video. On a - // loaded machine, this might cause us timeout with - // tasktracker -- so, just skip out here. - this.reporter.setStatus("skipping " + - this.arcLocation + " -- very long record " + - rec.getMetaData()); - break; - } - } catch (final Throwable e) { - // Failed parse of record. Keep going. - LOG.warn("Error processing " + rec.getMetaData(), e); - } - } - if (currentThread().isInterrupted()) { - LOG.info(currentThread().getName() + " interrupted"); - } - this.reporter.setStatus("closing " + this.arcLocation, true); - } catch (final Throwable e) { - // Problem parsing arc file. - final String msg = "Error parsing " + this.arcLocation; - try { - this.reporter.setStatus(msg, true); - } catch (final IOException ioe) { - ioe.printStackTrace(); - } - LOG.warn(msg, e); - } finally { - try { - arc.close(); - ARCMapRunner.this.mapper.onARCClose(); - } catch (final IOException e) { - e.printStackTrace(); - } - } - } - } - -} \ No newline at end of file Deleted: trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCRecordMapper.java =================================================================== --- trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCRecordMapper.java 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCRecordMapper.java 2007-02-27 18:01:29 UTC (rev 1521) @@ -1,49 +0,0 @@ -/* - * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ - * - * Copyright (C) 2007 Internet Archive. - * - * This file is part of the archive-access tools project - * (http://sourceforge.net/projects/archive-access). - * - * The archive-access tools are free software; you can redistribute them and/or - * modify them under the terms of the GNU Lesser Public License as published by - * the Free Software Foundation; either version 2.1 of the License, or any - * later version. - * - * The archive-access tools are distributed in the hope that they will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser - * Public License for more details. - * - * You should have received a copy of the GNU Lesser Public License along with - * the archive-access tools; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -package org.archive.access.nutch.mapred; - -import java.io.IOException; - -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.archive.io.arc.ARCRecord; - -/** - * Like {@link Mapper} but adds signaling of ARC open and close. - * @author stack - */ -public interface ARCRecordMapper extends Mapper { - /** - * Called after ARC open but before we call - * {@link #map(String, ARCRecord, OutputCollector, Reporter)} - * @throws IOException - */ - public void onARCOpen() throws IOException; - - /** - * Called on ARC close. - * @throws IOException - */ - public void onARCClose() throws IOException; -} Deleted: trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCReporter.java =================================================================== --- trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCReporter.java 2007-02-27 17:46:30 UTC (rev 1520) +++ trunk/archive-access/projects/nutchwax/src/java/org/archive/access/nutch/mapred/ARCReporter.java 2007-02-27 18:01:29 UTC (rev 1521) @@ -1,80 +0,0 @@ -/* - * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ - * - * Copyright (C) 2007 Internet Archive. - * - * This file is part of the archive-access tools project - * (http://sourceforge.net/projects/archive-access). - * - * The archive-access tools are free software; you can redistribute them and/or - * modify them under the terms of the GNU Lesser Public License as published by - * the Free Software Foundation; either version 2.1 of the License, or any - * later version. - * - * The archive-access tools are distributed in the hope that they will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser - * Public License for more details. - * - * You should have received a copy of the GNU Lesser Public License along with - * the archive-access tools; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -package org.archive.access.nutch.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.Reporter; - -/** - * Reporter that logs all status passed; a combined Reporter and logger. Only - * reports home every so often. - * @author stack - */ -public class ARCReporter implements Reporter { - public final Log LOG = LogFactory.getLog(this.getClass().getName()); - private final Reporter wrappedReporter; - private long nextUpdate = 0; - private long time = System.currentTimeMillis(); - - private static final long FIVE_MINUTES = 1000 * 60 * 5; - - public ARCReporter(final Reporter r) { - this.wrappedReporter = r; - } - - public void setStatus(final String msg) throws IOException { - setStatus(msg, false); - } - - public void setStatus(final String msg, final boolean writeThrough) - throws IOException { - LOG.info(msg); - // Only update tasktracker every second -- not for every record. - long now = System.currentTimeMillis(); - if (writeThrough || now > this.nextUpdate) { - this.wrappedReporter.setStatus(msg); - this.nextUpdate = now + 1000; - this.time = now; - } - } - - /** - * Update reporter if its a long time since last log only. - * @param msg Message to report IF we haven't reported in a long time. - * @throws IOException - */ - public void setStatusIfElapse(final String msg) - throws IOException { - long now = System.currentTimeMillis(); - if ((now - this.time) > FIVE_MINUTES) { - setStatus(msg); - } - } - - public void progress() throws IOException { - this.wrappedReporter.progress(); - } -} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |