From: <sta...@us...> - 2007-02-22 02:08:54
|
Revision: 1510 http://archive-access.svn.sourceforge.net/archive-access/?rev=1510&view=rev Author: stack-sf Date: 2007-02-21 18:08:52 -0800 (Wed, 21 Feb 2007) Log Message: ----------- First time add of commons project. Uses maven2 to build an archive-mapred.jar. Added Paths: ----------- trunk/archive-access/projects/commons/ trunk/archive-access/projects/commons/pom.xml trunk/archive-access/projects/commons/src/ trunk/archive-access/projects/commons/src/main/ trunk/archive-access/projects/commons/src/main/java/ trunk/archive-access/projects/commons/src/main/java/org/ trunk/archive-access/projects/commons/src/main/java/org/archive/ trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCMapRunner.java trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCRecordMapper.java trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCReporter.java trunk/archive-access/projects/commons/src/test/ trunk/archive-access/projects/commons/src/test/java/ trunk/archive-access/projects/commons/src/test/java/org/ trunk/archive-access/projects/commons/src/test/java/org/archive/ Added: trunk/archive-access/projects/commons/pom.xml =================================================================== --- trunk/archive-access/projects/commons/pom.xml (rev 0) +++ trunk/archive-access/projects/commons/pom.xml 2007-02-22 02:08:52 UTC (rev 1510) @@ -0,0 +1,135 @@ +<?xml version="1.0"?> +<!-- + POM reference: http://maven.apache.org/pom.html + + List of the better articles on maven: + + http://www.javaworld.com/javaworld/jw-05-2006/jw-0529-maven.html + http://www.javaworld.com/javaworld/jw-02-2006/jw-0227-maven_p.html + + URLs on converting from 1.0 to 2.0 maven (not much good generally): + + http://wiki.osafoundation.org/bin/view/Journal/Maven2Upgrade + http://maven.apache.org/guides/mini/guide-m1-m2.html + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.archive</groupId> + <artifactId>archive-mapred</artifactId> + <packaging>jar</packaging> + <version>1.0-SNAPSHOT</version> + <name>Archive Commons</name> + <description>Common code used across all Archive projects. + </description> + <url>http://archive-access.sourceforge.net/projects/commons/</url> + <inceptionYear>2004</inceptionYear> + + <licenses> + <license> + <name>GNU LESSER GENERAL PUBLIC LICENSE</name> + <url>http://www.gnu.org/licenses/lgpl.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + <organization> + <name>Internet Archive</name> + <url>http://www.archive.org/</url> + </organization> + <issueManagement> + <system>SourceForge</system> + <url>http://sourceforge.net/tracker/?group_id=118427</url> + </issueManagement> + <ciManagement> + <system>cruisecontrol</system> + <url>http://builds.archive.org:8080/cruisecontrol/</url> + </ciManagement> + <mailingLists> + <mailingList> + <name>Archive Access ARC Tools Discussion List</name> + <subscribe> + http://lists.sourceforge.net/lists/listinfo/archive-access-discuss + </subscribe> + <unsubscribe> + http://lists.sourceforge.net/lists/listinfo/archive-access-discuss + </unsubscribe> + <post>archive-access-discuss</post> + <archive> + http://sourceforge.net/mailarchive/forum.php?forum_id=45842 + </archive> + </mailingList> + <mailingList> + <name>Archive Access ARC Tools Commits</name> + <subscribe> + https://lists.sourceforge.net/lists/listinfo/archive-access-cvs + </subscribe> + <unsubscribe> + https://lists.sourceforge.net/lists/listinfo/archive-access-cvs + </unsubscribe> + <post>archive-access-cvs</post> + <archive> + http://sourceforge.net/mailarchive/forum.php?forum=archive-access-cvs + </archive> + </mailingList> + </mailingLists> + <scm> + <connection>scm:svn:https://archive-access.svn.sourceforge.net/svnroot/archive-access/</connection> + <tag>HEAD</tag> + <url>https://archive-access.svn.sourceforge.net/svnroot/archive-access/</url> + </scm> + <prerequisites> + <maven>2.0.4</maven> + </prerequisites> + + <build> + <plugins /> + </build> + + <repositories> + <repository> + <releases> + <enabled>true</enabled> + <updatePolicy>always</updatePolicy> + <checksumPolicy>warn</checksumPolicy> + </releases> + <snapshots> + <enabled>true</enabled> + <updatePolicy>never</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + <id>builds.archive.org</id> + <name></name> + <url>http://builds.archive.org:8080/maven2</url> + <layout>default</layout> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.0.4</version> + </dependency> + + <dependency> + <groupId>org.apache</groupId> + <artifactId>hadoop</artifactId> + <version>0.10.1-core</version> + </dependency> + <dependency> + <groupId>org.archive</groupId> + <artifactId>archive-commons</artifactId> + <version>1.11.0-200702211853</version> + </dependency> + + </dependencies> + +</project> Added: trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCMapRunner.java =================================================================== --- trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCMapRunner.java (rev 0) +++ trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCMapRunner.java 2007-02-22 02:08:52 UTC (rev 1510) @@ -0,0 +1,263 @@ +/* + * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ + * + * Copyright (C) 2007 Internet Archive. + * + * This file is part of the archive-access tools project + * (http://sourceforge.net/projects/archive-access). + * + * The archive-access tools are free software; you can redistribute them and/or + * modify them under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or any + * later version. + * + * The archive-access tools are distributed in the hope that they will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser + * Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License along with + * the archive-access tools; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.access.nutch.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapRunnable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.archive.io.ArchiveReader; +import org.archive.io.ArchiveReaderFactory; +import org.archive.io.arc.ARCConstants; +import org.archive.io.arc.ARCRecord; + +/** + * MapRunner that passes an ARCRecord to configured mapper. + * Configured mapper must be implementation of {@link ARCMapRunner}. + * @author stack + */ +public class ARCMapRunner implements MapRunnable { + public final Log LOG = LogFactory.getLog(this.getClass().getName()); + private ARCRecordMapper mapper; + + /** + * How long to spend indexing. + */ + private long maxtime; + + + public void configure(JobConf job) { + this.mapper = (ARCRecordMapper)ReflectionUtils. + newInstance(job.getMapperClass(), job); + // Value is in minutes. + this.maxtime = job.getLong("wax.index.timeout", 60) * 60 * 1000; + } + + public void run(RecordReader input, OutputCollector output, + Reporter reporter) + throws IOException { + try { + WritableComparable key = input.createKey(); // Unused. + Writable value = input.createValue(); + while (input.next(key, value)) { + doArc(value.toString(), output, new ARCReporter(reporter)); + } + } finally { + this.mapper.close(); + } + } + + protected void doArc(final String arcurl, final OutputCollector output, + final ARCReporter reporter) + throws IOException { + if ((arcurl == null) || arcurl.endsWith("work")) { + reporter.setStatus("skipping " + arcurl, true); + return; + } + + // Set off indexing in a thread so I can cover it with a timer. + final Thread t = new IndexingThread(arcurl, output, reporter); + t.setDaemon(true); + t.start(); + final long start = System.currentTimeMillis(); + try { + for (long period = this.maxtime; t.isAlive() && (period > 0); + period = this.maxtime - (System.currentTimeMillis() - start)) { + try { + t.join(period); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + } + } finally { + cleanup(t, reporter); + } + } + + protected void cleanup(final Thread t, final ARCReporter reporter) + throws IOException { + if (!t.isAlive()) { + return; + } + reporter.setStatus("Killing indexing thread " + t.getName(), true); + t.interrupt(); + try { + // Give it some time to die. + t.join(1000); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + if (t.isAlive()) { + LOG.info(t.getName() + " will not die"); + } + } + + private class IndexingThread extends Thread { + private final String arcLocation; + private final OutputCollector output; + private final ARCReporter reporter; + + public IndexingThread(final String arcloc, final OutputCollector o, + final ARCReporter r) { + // Name this thread same as ARC location. + super(arcloc); + this.arcLocation = arcloc; + this.output = o; + this.reporter = r; + } + + /** + * @return Null if fails download. + */ + protected ArchiveReader getArchiveReader() { + ArchiveReader arc = null; + // Need a thread that will keep updating TaskTracker during long + // downloads else tasktracker will kill us. + Thread reportingDuringDownload = null; + try { + this.reporter.setStatus("opening " + this.arcLocation, true); + reportingDuringDownload = new Thread("reportDuringDownload") { + public void run() { + while (!this.isInterrupted()) { + try { + synchronized (this) { + sleep(1000 * 60); // Sleep a minute. + } + reporter.setStatus("downloading " + + arcLocation); + } catch (final IOException e) { + e.printStackTrace(); + // No point hanging around if we're failing + // status. + break; + } catch (final InterruptedException e) { + // Interrupt flag is cleared. Just fall out. + break; + } + } + } + }; + reportingDuringDownload.setDaemon(true); + reportingDuringDownload.start(); + arc = ArchiveReaderFactory.get(this.arcLocation); + } catch (final Throwable e) { + try { + final String msg = "Error opening " + this.arcLocation + + ": " + e.toString(); + this.reporter.setStatus(msg, true); + LOG.info(msg); + } catch (final IOException ioe) { + LOG.warn(this.arcLocation, ioe); + } + } finally { + if ((reportingDuringDownload != null) + && reportingDuringDownload.isAlive()) { + reportingDuringDownload.interrupt(); + } + } + return arc; + } + + public void run() { + if (this.arcLocation == null || this.arcLocation.length() <= 0) { + return; + } + ArchiveReader arc = getArchiveReader(); + if (arc == null) { + return; + } + + try { + ARCMapRunner.this.mapper.onARCOpen(); + + // Iterate over each ARCRecord. + for (final Iterator i = arc.iterator(); + i.hasNext() && !currentThread().isInterrupted();) { + final ARCRecord rec = (ARCRecord)i.next(); + + + try { + ARCMapRunner.this.mapper.map( + new Text(rec.getMetaData().getUrl()), + new ObjectWritable(rec), this.output, + this.reporter); + + final long b = rec.getMetaData().getContentBegin(); + final long l = rec.getMetaData().getLength(); + final long recordLength = (l > b)? (l - b): l; + if (recordLength > + ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE) { + // Now, if the content length is larger than a + // standard ARC, then it is most likely the last + // record in the ARC because ARC is closed after we + // exceed 100MB (DEFAULT_MAX_ARC...). Calling + // hasNext above will make us read through the + // whole record, even if its a 1.7G video. On a + // loaded machine, this might cause us timeout with + // tasktracker -- so, just skip out here. + this.reporter.setStatus("skipping " + + this.arcLocation + " -- very long record " + + rec.getMetaData()); + break; + } + } catch (final Throwable e) { + // Failed parse of record. Keep going. + LOG.warn("Error processing " + rec.getMetaData(), e); + } + } + if (currentThread().isInterrupted()) { + LOG.info(currentThread().getName() + " interrupted"); + } + this.reporter.setStatus("closing " + this.arcLocation, true); + } catch (final Throwable e) { + // Problem parsing arc file. + final String msg = "Error parsing " + this.arcLocation; + try { + this.reporter.setStatus(msg, true); + } catch (final IOException ioe) { + ioe.printStackTrace(); + } + LOG.warn(msg, e); + } finally { + try { + arc.close(); + ARCMapRunner.this.mapper.onARCClose(); + } catch (final IOException e) { + e.printStackTrace(); + } + } + } + } + +} \ No newline at end of file Added: trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCRecordMapper.java =================================================================== --- trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCRecordMapper.java (rev 0) +++ trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCRecordMapper.java 2007-02-22 02:08:52 UTC (rev 1510) @@ -0,0 +1,49 @@ +/* + * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ + * + * Copyright (C) 2007 Internet Archive. + * + * This file is part of the archive-access tools project + * (http://sourceforge.net/projects/archive-access). + * + * The archive-access tools are free software; you can redistribute them and/or + * modify them under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or any + * later version. + * + * The archive-access tools are distributed in the hope that they will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser + * Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License along with + * the archive-access tools; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.access.nutch.mapred; + +import java.io.IOException; + +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.archive.io.arc.ARCRecord; + +/** + * Like {@link Mapper} but adds signaling of ARC open and close. + * @author stack + */ +public interface ARCRecordMapper extends Mapper { + /** + * Called after ARC open but before we call + * {@link #map(String, ARCRecord, OutputCollector, Reporter)} + * @throws IOException + */ + public void onARCOpen() throws IOException; + + /** + * Called on ARC close. + * @throws IOException + */ + public void onARCClose() throws IOException; +} Added: trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCReporter.java =================================================================== --- trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCReporter.java (rev 0) +++ trunk/archive-access/projects/commons/src/main/java/org/archive/mapred/ARCReporter.java 2007-02-22 02:08:52 UTC (rev 1510) @@ -0,0 +1,80 @@ +/* + * $Id: ImportArcs.java 1494 2007-02-15 17:47:58Z stack-sf $ + * + * Copyright (C) 2007 Internet Archive. + * + * This file is part of the archive-access tools project + * (http://sourceforge.net/projects/archive-access). + * + * The archive-access tools are free software; you can redistribute them and/or + * modify them under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or any + * later version. + * + * The archive-access tools are distributed in the hope that they will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser + * Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License along with + * the archive-access tools; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.access.nutch.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.Reporter; + +/** + * Reporter that logs all status passed; a combined Reporter and logger. Only + * reports home every so often. + * @author stack + */ +public class ARCReporter implements Reporter { + public final Log LOG = LogFactory.getLog(this.getClass().getName()); + private final Reporter wrappedReporter; + private long nextUpdate = 0; + private long time = System.currentTimeMillis(); + + private static final long FIVE_MINUTES = 1000 * 60 * 5; + + public ARCReporter(final Reporter r) { + this.wrappedReporter = r; + } + + public void setStatus(final String msg) throws IOException { + setStatus(msg, false); + } + + public void setStatus(final String msg, final boolean writeThrough) + throws IOException { + LOG.info(msg); + // Only update tasktracker every second -- not for every record. + long now = System.currentTimeMillis(); + if (writeThrough || now > this.nextUpdate) { + this.wrappedReporter.setStatus(msg); + this.nextUpdate = now + 1000; + this.time = now; + } + } + + /** + * Update reporter if its a long time since last log only. + * @param msg Message to report IF we haven't reported in a long time. + * @throws IOException + */ + public void setStatusIfElapse(final String msg) + throws IOException { + long now = System.currentTimeMillis(); + if ((now - this.time) > FIVE_MINUTES) { + setStatus(msg); + } + } + + public void progress() throws IOException { + this.wrappedReporter.progress(); + } +} \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |