From: <bra...@us...> - 2011-11-18 23:18:51
|
Revision: 3567 http://archive-access.svn.sourceforge.net/archive-access/?rev=3567&view=rev Author: bradtofel Date: 2011-11-18 23:18:42 +0000 (Fri, 18 Nov 2011) Log Message: ----------- IA-specific tools, for interacting with CDX data in HDFS, generic tools for using HDFS, and several Hadoop map-reduce jobs Added Paths: ----------- trunk/archive-access/projects/ia-tools/.classpath trunk/archive-access/projects/ia-tools/.project trunk/archive-access/projects/ia-tools/.settings/ trunk/archive-access/projects/ia-tools/.settings/org.eclipse.jdt.core.prefs trunk/archive-access/projects/ia-tools/.settings/org.maven.ide.eclipse.prefs trunk/archive-access/projects/ia-tools/pom.xml trunk/archive-access/projects/ia-tools/src/ trunk/archive-access/projects/ia-tools/src/main/ trunk/archive-access/projects/ia-tools/src/main/java/ trunk/archive-access/projects/ia-tools/src/main/java/org/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/BlockLoader.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXCluster.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXClusterRangeDumper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ClusterRange.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSBlockLoader.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSLSR.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSRangeDumper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ManifestAggregator.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SplitFile.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SummaryGenerator.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlock.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlockIterator.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/notes.txt trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputSplit.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/BuildCluster.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/CDXTransformer.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/CDXTransformingJob.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/HTTPImportJob.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/JobDriver.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/MergeClusterRangesJob.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/MergeClusters.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/AlphaPartitioner.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/CDXMapper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GZIPMembersLineInputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GZIPMembersLineRecordReader.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GZIPRangeLineDereferencingInputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GZIPRangeLineDereferencingRecordReader.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GlobalWaybackCDXReducer.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/GlobalWaybackMergeMapper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/HTTPImportMapper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/IdentityTextReducer.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/LineDereferencingInputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/LineDereferencingRecordReader.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/OvercrawlZipNumRecordWriter.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/SimpleTextMapper.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/SortMergeInputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/SortMergeInputSplit.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/ZipNumOutputFormat.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/ZipNumRecordWriter.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/ZipNumRecordWriterOld.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/storage/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/storage/ZipNumStorage.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/util/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/util/HDFSMove.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/util/HDFSSync.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/util/HDFSeeko.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/util/PartitionName.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/io/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/io/ZipNumWriterTool.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/server/ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/server/FileBackedInputStream.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/server/GZRangeClient.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/server/GZRangeClientTool.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/server/GZRangeServer.java trunk/archive-access/projects/ia-tools/src/test/ trunk/archive-access/projects/ia-tools/src/test/java/ trunk/archive-access/projects/ia-tools/src/test/java/org/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/cdx/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/cdx/CDXClusterTest.java trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/cdx/SplitFileTest.java trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/func/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/func/URLResolverFuncTest.java trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/storage/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/hadoop/storage/ZipnumRecordWriterTest.java trunk/archive-access/projects/ia-tools/src/test/java/org/archive/server/ trunk/archive-access/projects/ia-tools/src/test/java/org/archive/server/GZRangeClientTest.java Added: trunk/archive-access/projects/ia-tools/.classpath =================================================================== --- trunk/archive-access/projects/ia-tools/.classpath (rev 0) +++ trunk/archive-access/projects/ia-tools/.classpath 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,8 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="src" output="target/classes" path="src/main/java"/> + <classpathentry kind="src" output="target/test-classes" path="src/test/java"/> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/> + <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/> + <classpathentry kind="output" path="target/classes"/> +</classpath> Added: trunk/archive-access/projects/ia-tools/.project =================================================================== --- trunk/archive-access/projects/ia-tools/.project (rev 0) +++ trunk/archive-access/projects/ia-tools/.project 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>cdx-commons</name> + <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.maven.ide.eclipse.maven2Builder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.maven.ide.eclipse.maven2Nature</nature> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription> Added: trunk/archive-access/projects/ia-tools/.settings/org.eclipse.jdt.core.prefs =================================================================== --- trunk/archive-access/projects/ia-tools/.settings/org.eclipse.jdt.core.prefs (rev 0) +++ trunk/archive-access/projects/ia-tools/.settings/org.eclipse.jdt.core.prefs 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,13 @@ +#Wed Nov 16 15:29:11 PST 2011 +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.6 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.6 Added: trunk/archive-access/projects/ia-tools/.settings/org.maven.ide.eclipse.prefs =================================================================== --- trunk/archive-access/projects/ia-tools/.settings/org.maven.ide.eclipse.prefs (rev 0) +++ trunk/archive-access/projects/ia-tools/.settings/org.maven.ide.eclipse.prefs 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,9 @@ +#Wed Nov 16 15:28:40 PST 2011 +activeProfiles= +eclipse.preferences.version=1 +fullBuildGoals=process-test-resources +includeModules=false +resolveWorkspaceProjects=true +resourceFilterGoals=process-resources resources\:testResources +skipCompilerPlugin=true +version=1 Added: trunk/archive-access/projects/ia-tools/pom.xml =================================================================== --- trunk/archive-access/projects/ia-tools/pom.xml (rev 0) +++ trunk/archive-access/projects/ia-tools/pom.xml 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,94 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.archive</groupId> + <artifactId>ia-tools</artifactId> + <version>1.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>cdx-commons</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.archive</groupId> + <artifactId>archive-commons</artifactId> + <version>0.0.1-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.pig</groupId> + <artifactId>pig</artifactId> + <version>0.8.0</version> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>jarjar-maven-plugin</artifactId> + <version>1.5</version> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.2-beta-1</version> + <configuration> + + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + + <finalName>ia-tools</finalName> + <archive> + <manifest> + <mainClass>org.archive.hadoop.jobs.JobDriver</mainClass> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>attached</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <repositories> + <repository> + <id>internetarchive</id> + <name>Internet Archive Maven Repository</name> + <url>http://builds.archive.org:8080/maven2</url> + <layout>default</layout> + + <releases> + <enabled>true</enabled> + <updatePolicy>daily</updatePolicy> + <checksumPolicy>warn</checksumPolicy> + </releases> + <snapshots> + <enabled>true</enabled> + <updatePolicy>daily</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> +</project> Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/BlockLoader.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/BlockLoader.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/BlockLoader.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,8 @@ +package org.archive.hadoop.cdx; + +import java.io.IOException; + +public interface BlockLoader { + + public byte[] readBlock(String url, long start, int length) throws IOException; +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXCluster.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXCluster.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXCluster.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,47 @@ +package org.archive.hadoop.cdx; + +import java.io.IOException; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.archive.util.binsearch.SortedTextFile; +import org.archive.util.binsearch.impl.HDFSSeekableLineReaderFactory; +import org.archive.util.iterator.BoundedStringIterator; +import org.archive.util.iterator.CloseableIterator; + +public class CDXCluster { + private final static Logger LOGGER = + Logger.getLogger(CDXCluster.class.getName()); + + private Path clusterPath; + SortedTextFile summary; + private FileSystem fs; + private BlockLoader loader; + public CDXCluster(Configuration conf, Path clusterPath) throws IOException { + this.clusterPath = clusterPath; + fs = clusterPath.getFileSystem(conf); + loader = new HDFSBlockLoader(fs); + Path summaryPath = new Path(clusterPath,"ALL.summary"); + HDFSSeekableLineReaderFactory factory = + new HDFSSeekableLineReaderFactory(fs, summaryPath); + summary = new SortedTextFile(factory); + } + public CloseableIterator<String> getRangeBlockIterator(String start, String end) throws IOException { + CloseableIterator<String> blocks = + summary.getRecordIterator(start, true); + return new BoundedStringIterator(blocks, end); + } + public byte[] loadBlock(ZipNumBlock block) throws IOException { + return loadBlock(block.shard,block.start,block.length); + } + public byte[] loadBlock(String name, long start, int length) throws IOException { + LOGGER.warning(String.format("Loading(%s,%d,%d",name,start,length)); + Path shardPath = new Path(clusterPath,name + ".gz"); + return loader.readBlock(shardPath.toUri().toASCIIString(), start, length); + } + public CloseableIterator<String> getRange(String start, String end) throws IOException { + return new ClusterRange(this,start,end); + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXClusterRangeDumper.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXClusterRangeDumper.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXClusterRangeDumper.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,81 @@ +package org.archive.hadoop.cdx; + +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.archive.util.iterator.SortedCompositeIterator; + +public class CDXClusterRangeDumper implements Tool { + Charset UTF8 = Charset.forName("utf-8"); + public final static String TOOL_NAME = "cluster-range"; + public static final String TOOL_DESCRIPTION = + "A tool for dumping ranges of a CDX cluster to STDOUT"; + + private Configuration conf; + + + public void setConf(Configuration conf) { + this.conf = conf; + } + public Configuration getConf() { + return conf; + } + + public static int USAGE(int code) { + System.err.println("Usage: " + TOOL_NAME + " START END CLUSTER_HDFS_URL ..."); + System.err.println("\tDump all CDX records within the cluster at CLUSTER_HDFS_URL"); + System.err.println("\tstarting at START(inclusive), ending at END(exclusive)"); + System.err.println("\tto STDOUT. If multiple clusters URLs are specified"); + System.err.println("\ttheir results will be merged into a single sorted stream."); + return code; + } + public int run(String[] args) throws Exception { + if(args.length < 3) { + return USAGE(1); + } + String start = args[0]; + String end = args[1]; + Iterator<String> itr; + if(args.length == 3) { + Path clusterPath = new Path(args[2]); + CDXCluster c = new CDXCluster(getConf(), clusterPath); + itr = c.getRange(start,end); + } else { + Comparator<String> comparator = new Comparator<String>() { + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; + SortedCompositeIterator<String> scitr = + new SortedCompositeIterator<String>(comparator); + for(int i = 2; i < args.length; i++) { + Path clusterPath = new Path(args[i]); + CDXCluster c = new CDXCluster(getConf(), clusterPath); + scitr.addIterator(c.getRange(start,end)); + } + itr = scitr; + } + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, UTF8)); + + while(itr.hasNext()) { + pw.println(itr.next()); + } + pw.flush(); + pw.close(); + + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CDXClusterRangeDumper(), args); + System.exit(res); + } + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,56 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.archive.hadoop.mapreduce.CDXMapper; +import org.archive.hadoop.mapreduce.CDXMapper.StringPair; + +public class CDXConverterTool implements Tool { + + Charset UTF8 = Charset.forName("utf-8"); + public final static String TOOL_NAME = "cdx-convert"; + public static final String TOOL_DESCRIPTION = + "A tool for converting old CDX lines from STDIN to SURT form on STDOUT"; + + private Configuration conf; + public void setConf(Configuration conf) { + this.conf = conf; + } + public Configuration getConf() { + return conf; + } + public int run(String[] args) throws Exception { + CDXMapper mapper = new CDXMapper(); + mapper.setConf(getConf()); + BufferedReader br = + new BufferedReader(new InputStreamReader(System.in,UTF8)); + PrintWriter pw = + new PrintWriter(new OutputStreamWriter(System.out,UTF8)); + while(true) { + String cdxLine = br.readLine(); + if(cdxLine == null) { + break; + } + StringPair pair = mapper.convert(cdxLine); + pw.print(pair.first); + pw.print(" "); + pw.print(pair.second); + pw.println(); + } + pw.flush(); + return 0; + } + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CDXConverterTool(), args); + System.exit(res); + } + + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ClusterRange.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ClusterRange.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ClusterRange.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,75 @@ +package org.archive.hadoop.cdx; + +import java.io.IOException; +import java.util.Iterator; +import java.util.logging.Logger; + +import org.archive.util.iterator.AbstractPeekableIterator; +import org.archive.util.iterator.BoundedStringIterator; +import org.archive.util.iterator.CloseableIterator; +import org.archive.util.iterator.StartBoundedStringIterator; + + +public class ClusterRange extends AbstractPeekableIterator<String> { + private final static Logger LOG = + Logger.getLogger(ClusterRange.class.getName()); + private boolean isFirst; + private boolean done; + private String start; + private String end; + private CDXCluster cluster; + private CloseableIterator<String> blocks; + private Iterator<String> current; + public ClusterRange(CDXCluster cluster, String start, String end) throws IOException { + this.cluster = cluster; + this.start = start; + this.end = end; + blocks = cluster.getRangeBlockIterator(start, end); + done = false; + isFirst = true; + } + @Override + public String getNextInner() { + try { + return getNextWrapper(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public String getNextWrapper() throws IOException { + if(done) { + return null; + } + if(current != null) { + if(current.hasNext()) { + return current.next(); + } + // done with current: + } + while(blocks.hasNext()) { + String nextLine = blocks.next(); + ZipNumBlock block = new ZipNumBlock(nextLine); + byte[] compressed = cluster.loadBlock(block); + LOG.fine(String.format("Loaded block:%s (%d)(%d)", + block.shard,block.start,block.length)); + ZipNumBlockIterator zbi = new ZipNumBlockIterator(compressed); + Iterator<String> itr = zbi.iterator(); + if(isFirst) { + isFirst = false; + current = new BoundedStringIterator(new StartBoundedStringIterator(itr,start),end); + } else { + current = new BoundedStringIterator(itr, end); + } + if(current.hasNext()) { + return current.next(); + } + } + done = true; + return null; + } + @Override + public void close() throws IOException { + blocks.close(); + } + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSBlockLoader.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSBlockLoader.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSBlockLoader.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,42 @@ +package org.archive.hadoop.cdx; + +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HDFSBlockLoader implements BlockLoader { + private FileSystem fs; + FSDataInputStream fsdis; + String currentUrl; + long currentOffset = -1; + public HDFSBlockLoader(FileSystem fs) { + this.fs = fs; + fsdis = null; + currentUrl = null; + currentOffset = -1; + } + public byte[] readBlock(String url, long start, int length) throws IOException { + byte[] buffer = new byte[length]; + if(url.equals(currentUrl)) { + if(start != currentOffset) { + fsdis.seek(start); + currentOffset = start; + } + fsdis.readFully(buffer); + currentOffset += length; + return buffer; + } + if(fsdis != null) { + fsdis.close(); + } + fsdis = fs.open(new Path(url)); + currentUrl = url; + fsdis.seek(start); + fsdis.readFully(buffer); + currentOffset = start + length; + return buffer; + } + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSLSR.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSLSR.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSLSR.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,71 @@ +package org.archive.hadoop.cdx; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class HDFSLSR implements Tool { + + Charset UTF8 = Charset.forName("utf-8"); + public final static String TOOL_NAME = "hdfs-lsr"; + public static final String TOOL_DESCRIPTION = + "A tool for producing lsr type output from HDFS to STDOUT"; + + private Configuration conf; + + + public void listPath(FileStatus status, FileSystem fs, PrintWriter target) throws IOException { + if(status.isDir()) { + //System.err.format("Recursing into %s\n", status.getPath().toUri().toASCIIString()); + FileStatus entries[] = fs.listStatus(status.getPath()); + for(FileStatus entry : entries) { + listPath(entry,fs,target); + } + } else { + Path path = status.getPath(); + target.format("%s\t%s\n", + path.getName(), path.toUri().toASCIIString()); + } + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + public Configuration getConf() { + return conf; + } + public static int USAGE(int code) { + System.err.println("Usage: " + TOOL_NAME + " HDFS_URL"); + System.err.println("\tRecursively descend into HDFS_URL, producing one line"); + System.err.println("\tto STDOUT for each FILE found. Lines are of the format:"); + System.err.println("\t\tBASENAME<tab>PATH"); + return code; + } + public int run(String[] args) throws Exception { + if(args.length != 1) { + return USAGE(1); + } + Path path = new Path(args[0]); + FileSystem fs = path.getFileSystem(getConf()); + FileStatus status = fs.getFileStatus(path); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, UTF8)); + listPath(status, fs, pw); + pw.flush(); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new HDFSLSR(), args); + System.exit(res); + } + + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSRangeDumper.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSRangeDumper.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/HDFSRangeDumper.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,114 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.archive.util.StreamCopy; + +public class HDFSRangeDumper implements Tool { + + Charset UTF8 = Charset.forName("utf-8"); + public final static String TOOL_NAME = "range-dumper"; + public static final String TOOL_DESCRIPTION = + "A tool for dumping contents of files from HDFS to STDOUT"; + + private Configuration conf; + + + public void dumpPath(Path inputPath, OutputStream target) throws IOException { + FileSystem fs = inputPath.getFileSystem(getConf()); + FSDataInputStream fsdis = fs.open(inputPath); + StreamCopy.copy(fsdis, target); + fsdis.close(); + } + public void dumpPath(Path inputPath, OutputStream target, long start, long length) throws IOException { + String inputPathString = inputPath.toUri().toASCIIString(); + FileSystem fs = inputPath.getFileSystem(getConf()); + FSDataInputStream fsdis = fs.open(inputPath); + fsdis.seek(start); + long amt = StreamCopy.copyLength(fsdis, target, length); + if(amt != length) { + throw new IOException( + String.format("Short copy(%s)(%d)(%d): got(%d)\n", + inputPathString,start,length,amt)); + } + fsdis.close(); + } + public void setConf(Configuration conf) { + this.conf = conf; + } + public Configuration getConf() { + return conf; + } + public static void USAGE(int code) { + System.err.println("Usage: " + TOOL_NAME + " [INPUT]"); + System.err.println("\tReads lines from local path INPUT (or STDIN if omitted) of the format:"); + System.err.println("\t\tHDFS_URL"); + System.err.println("\tOR"); + System.err.println("\t\tHDFS_URL<tab>OFFSET<tab>LENGTH"); + System.err.println("\tIn the first form, dumps the entire contents of HDFS_URL"); + System.err.println("\tIn the second, dumps LENGTH octets from HDFS_URL beginning at offset OFFSET"); + System.exit(code); + } + public int run(String[] args) throws Exception { + if(args.length > 1) { + USAGE(1); + } + InputStreamReader isr = null; + File input = null; + if(args.length == 0) { + isr = new InputStreamReader(System.in,UTF8); + } else { + input = new File(args[0]); + isr = new InputStreamReader(new FileInputStream(input),UTF8); + } + BufferedReader br = new BufferedReader(isr); + String line; + OutputStream out = new BufferedOutputStream(System.out); + while(true) { + line = br.readLine(); + if(line == null) { + break; + } + String parts[] = line.split("\t"); + if(parts.length == 1) { + + dumpPath(new Path(line), out); + + } else if(parts.length == 3) { + + long start = Long.parseLong(parts[1]); + long length = Long.parseLong(parts[2]); + + dumpPath(new Path(parts[0]), System.out, start, length); + + } else { + throw new IOException("Wrong number of fields in " + line); + } + System.err.format("Dumped\t%s\n",parts[0]); + } + if(input != null) { + isr.close(); + } + out.flush(); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new HDFSRangeDumper(), args); + System.exit(res); + } + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ManifestAggregator.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ManifestAggregator.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ManifestAggregator.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,108 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Comparator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.archive.util.iterator.AbstractPeekableIterator; +import org.archive.util.iterator.SortedCompositeIterator; + +public class ManifestAggregator implements Tool { + + public final static String TOOL_NAME = "manifest-aggregator"; + public static final String MANIFEST_BASENAME = "manifest.txt"; + public static final String TOOL_DESCRIPTION = + "A tool for merging sorted manifest files in a CDX HDFS installation"; + + private Configuration conf; + + public void aggregate(Path partsPath, OutputStream target) throws IOException { + String dirString = partsPath.toUri().toASCIIString(); + + FileSystem fs = partsPath.getFileSystem(getConf()); + FileStatus status = fs.getFileStatus(partsPath); + if(!status.isDir()) { + throw new IOException(dirString + " is not a directory!"); + } + FileStatus entries[] = fs.listStatus(partsPath); + ArrayList<Path> manifests = new ArrayList<Path>(); + for(FileStatus entry : entries) { + Path entryPath = entry.getPath(); + if(!entry.isDir()) { + throw new IOException( + String.format("Non directory entry (%s) in %s", + entryPath.getName(),dirString)); + } + Path manifestPath = new Path(entryPath,MANIFEST_BASENAME); + if(!fs.isFile(manifestPath)) { + throw new IOException( + String.format("No file at manifest path %s", + manifestPath.toUri().toASCIIString())); + } + manifests.add(manifestPath); + } + Comparator<String> comparator = new Comparator<String>() { + public int compare(String s1, String s2) { + return s1.compareTo(s2); + } + }; + Charset UTF8 = Charset.forName("utf-8"); + SortedCompositeIterator<String> mergeItr = + new SortedCompositeIterator<String>(comparator); + for(Path manifestPath : manifests) { + FSDataInputStream fsdis = fs.open(manifestPath); + InputStreamReader isr = new InputStreamReader(fsdis, UTF8); + BufferedReader br = new BufferedReader(isr); + mergeItr.addIterator(AbstractPeekableIterator.wrapReader(br)); + } + OutputStreamWriter osw = new OutputStreamWriter(target, UTF8); + PrintWriter pw = new PrintWriter(osw); + while(mergeItr.hasNext()) { + pw.println(mergeItr.next()); + } + pw.flush(); + pw.close(); + mergeItr.close(); + } + public void setConf(Configuration conf) { + this.conf = conf; + } + public Configuration getConf() { + return conf; + } + public static void USAGE(int code) { + System.err.println("Usage: " + TOOL_NAME + " HDFS_PATH LOCAL_PATH"); + System.exit(code); + } + public int run(String[] args) throws Exception { + if(args.length != 2) { + USAGE(1); + } + Path inputDir = new Path(args[0]); + File target = new File(args[1]); + FileOutputStream fos = new FileOutputStream(target); + aggregate(inputDir, fos); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new ManifestAggregator(), args); + System.exit(res); + } + +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SplitFile.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SplitFile.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SplitFile.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,53 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; +import java.util.ArrayList; + +public class SplitFile { + public SplitLine[] lines; + + public SplitFile() {}; + public int size() { + return lines.length; + } + public String getStart(int i) { + return lines[i].start; + } + public String getEnd(int i) { + return lines[i].end; + } + public String getName(int i) { + return lines[i].name; + } + public void read(Reader r) throws IOException { + BufferedReader br = new BufferedReader(r); + ArrayList<SplitLine> tmp = new ArrayList<SplitLine>(); + while(true) { + String line = br.readLine(); + if(line == null) { + break; + } + tmp.add(new SplitLine(line)); + } + if(tmp.size() == 0) { + throw new IOException("Empty split file"); + } + lines = tmp.toArray(new SplitLine[] {}); + } + private class SplitLine { + String name; + String start; + String end; + public SplitLine(String line) { + String parts[] = line.split("\\s"); + if(parts.length != 3) { + throw new IllegalArgumentException("Bad split line:" + line); + } + name = parts[0]; + start = parts[1]; + end = parts[2]; + } + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SummaryGenerator.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SummaryGenerator.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/SummaryGenerator.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,237 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class SummaryGenerator implements Tool { + + private static final Logger LOGGER = + Logger.getLogger(SummaryGenerator.class.getName()); + + public static final String SUMMARY_SUFFIX = ".summary"; + public static final String GZ_SUFFIX = ".gz"; + public static final String ALL_SUMMARY_PREFIX = "ALL"; + + public final static String TOOL_NAME = "summary-generator"; + public static final String TOOL_DESCRIPTION = + "A tool for generating a meta-index summary from a set of shard partition summaries in a CDX HDFS installation"; + + private Configuration conf; + public Configuration getConf() { return conf; } + public void setConf(Configuration conf) { this.conf = conf; } + + public static void USAGE(int code) { + System.err.println("USAGE: " + TOOL_NAME + " HDFS_URL"); + System.exit(code); + } + + public void createSummary(FileSystem fs, Path clusterPath, PrintWriter pw) + throws IOException { + Charset UTF8 = Charset.forName("utf-8"); + HashMap<String, Path> summaries = new HashMap<String, Path>(); + HashMap<String, Path> parts = new HashMap<String, Path>(); + HashMap<String,Long> partsLength = new HashMap<String, Long>(); + FileStatus entries[] = fs.listStatus(clusterPath); + int sumLen = SUMMARY_SUFFIX.length(); + int gzLen = GZ_SUFFIX.length(); + for(FileStatus entry : entries) { + Path entryPath = entry.getPath(); + String pathStr = entryPath.toUri().toASCIIString(); + String name = entryPath.getName(); + if(entry.isDir()) { + LOGGER.info("Ignoring Directory entry " + pathStr); + } else if(name.equals(ALL_SUMMARY_PREFIX + SUMMARY_SUFFIX)) { + // just skip - this is our target.. + } else if(name.endsWith(SUMMARY_SUFFIX)) { + String prefix = name.substring(0,name.length() - sumLen); + summaries.put(prefix, entryPath); + } else if(name.endsWith(GZ_SUFFIX)) { + String prefix = name.substring(0,name.length() - gzLen); + parts.put(prefix, entryPath); + partsLength.put(prefix, entry.getLen()); + } else { + LOGGER.info("Ignoring entry " + pathStr); + } + } + // just for sanities sake - lets make sure all summaries have a part: + for(String name : summaries.keySet()) { + if(!parts.containsKey(name)) { + throw new IOException("Missing part for summary:" + name); + } + } + // now dump all summaries - make sure we do it in sorted order: + ArrayList<String> summaryNames = new ArrayList<String>(summaries.keySet()); + String tmp[] = new String[0]; + String tmp2[] = summaryNames.toArray(tmp); + Arrays.sort(tmp2); + for(String part : tmp2) { +// long length = partsLength.get(part); + FSDataInputStream fsdis = fs.open(summaries.get(part)); + InputStreamReader isr = new InputStreamReader(fsdis,UTF8); + BufferedReader br = new BufferedReader(isr); + String line; +// String prevUrl = null; +// long prevOffset = 0; + while(true) { + line = br.readLine(); + if(line == null) { + break; + } + String fields[] = line.split("\\s"); + if(fields.length < 3) { + throw new IOException("Bad line in " + part + ":" + line); + } + long offset = Long.parseLong(fields[0]); + long length = Long.parseLong(fields[1]); + String url = fields[2]; + pw.format("%s\t%s\t%d\t%d\n", + url, part, offset, length); + +// if(prevUrl != null) { +// pw.format("%s\t%s\t%d\t%d\n", +// prevUrl, part, prevOffset, offset - prevOffset); +// } +// prevUrl = url; +// prevOffset = offset; + } +// if(prevUrl != null) { +// pw.format("%s\t%s\t%d\t%d\n", +// prevUrl, part, prevOffset, length - prevOffset); +// } + br.close(); + } + pw.flush(); + } + public void createSummaryOld(FileSystem fs, Path clusterPath, PrintWriter pw) + throws IOException { + Charset UTF8 = Charset.forName("utf-8"); + HashMap<String, Path> summaries = new HashMap<String, Path>(); + HashMap<String, Path> parts = new HashMap<String, Path>(); + HashMap<String,Long> partsLength = new HashMap<String, Long>(); + FileStatus entries[] = fs.listStatus(clusterPath); + int sumLen = SUMMARY_SUFFIX.length(); + int gzLen = GZ_SUFFIX.length(); + for(FileStatus entry : entries) { + Path entryPath = entry.getPath(); + String pathStr = entryPath.toUri().toASCIIString(); + String name = entryPath.getName(); + if(entry.isDir()) { + LOGGER.info("Ignoring Directory entry " + pathStr); + } else if(name.endsWith(SUMMARY_SUFFIX)) { + String prefix = name.substring(0,name.length() - sumLen); + summaries.put(prefix, entryPath); + } else if(name.endsWith(GZ_SUFFIX)) { + String prefix = name.substring(0,name.length() - gzLen); + parts.put(prefix, entryPath); + partsLength.put(prefix, entry.getLen()); + } else { + LOGGER.info("Ignoring entry " + pathStr); + } + } + // just for sanities sake - lets make sure all summaries have a part: + for(String name : summaries.keySet()) { + if(!parts.containsKey(name)) { + throw new IOException("Missing part for summary:" + name); + } + } + // now dump all summaries - make sure we do it in sorted order: + ArrayList<String> summaryNames = new ArrayList<String>(summaries.keySet()); + String tmp[] = new String[0]; + String tmp2[] = summaryNames.toArray(tmp); + Arrays.sort(tmp2); + for(String part : tmp2) { + long length = partsLength.get(part); + FSDataInputStream fsdis = fs.open(summaries.get(part)); + InputStreamReader isr = new InputStreamReader(fsdis,UTF8); + BufferedReader br = new BufferedReader(isr); + String line; + String prevUrl = null; + long prevOffset = 0; + while(true) { + line = br.readLine(); + if(line == null) { + break; + } + String fields[] = line.split("\\s"); + if(fields.length < 3) { + throw new IOException("Bad line in " + part + ":" + line); + } + long offset = Long.parseLong(fields[0]); + String url = fields[1]; + + if(prevUrl != null) { + pw.format("%s\t%s\t%d\t%d\n", + prevUrl, part, prevOffset, offset - prevOffset); + } + prevUrl = url; + prevOffset = offset; + } + if(prevUrl != null) { + pw.format("%s\t%s\t%d\t%d\n", + prevUrl, part, prevOffset, length - prevOffset); + } + br.close(); + } + pw.flush(); + } + + public int run(String[] args) throws Exception { + if(args.length < 1) { + USAGE(1); + } + if(args.length > 2) { + USAGE(1); + } + + String hdfsUrl = args[0]; + boolean isOld = false; + if(args.length == 2) { + hdfsUrl = args[1]; + isOld = true; + } + URI uri = new URI(hdfsUrl); + FileSystem fs = FileSystem.get(uri,getConf()); + Path path = new Path(hdfsUrl); + Path target = new Path(path,ALL_SUMMARY_PREFIX + SUMMARY_SUFFIX); + if(fs.exists(target)) { + System.err.format("Error-exists: " + target.toUri().toASCIIString()); + return 1; + } + + Charset UTF8 = Charset.forName("utf-8"); + FSDataOutputStream os = fs.create(target); + OutputStreamWriter osw = new OutputStreamWriter(os, UTF8); + PrintWriter pw = new PrintWriter(osw); + if(isOld) { + createSummaryOld(fs, path, pw); + } else { + createSummary(fs, path, pw); + } + osw.flush(); + osw.close(); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new SummaryGenerator(), args); + System.exit(res); + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlock.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlock.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlock.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,18 @@ +package org.archive.hadoop.cdx; + +public class ZipNumBlock { + String url; + String shard; + long start; + int length; + public ZipNumBlock(String line) { + String parts[] = line.split("\t"); + if(parts.length != 4) { + throw new IllegalArgumentException("Bad block:" + line); + } + url = parts[0]; + shard = parts[1]; + start = Long.parseLong(parts[2]); + length = Integer.parseInt(parts[3]); + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlockIterator.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlockIterator.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/ZipNumBlockIterator.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,24 @@ +package org.archive.hadoop.cdx; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.archive.util.iterator.AbstractPeekableIterator; +import org.archive.util.iterator.CloseableIterator; +import org.archive.util.zip.OpenJDK7GZIPInputStream; + +public class ZipNumBlockIterator { + private byte[] compressed; + public ZipNumBlockIterator(byte[] compressed) { + this.compressed = compressed; + } + public CloseableIterator<String> iterator() throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(compressed); + OpenJDK7GZIPInputStream gzis = new OpenJDK7GZIPInputStream(bais); + InputStreamReader isr = new InputStreamReader(gzis); + BufferedReader br = new BufferedReader(isr); + return AbstractPeekableIterator.wrapReader(br); + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/notes.txt =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/notes.txt (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/notes.txt 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,36 @@ +An automatic index updating process: + +SHARD: + a compressed, alphabetically sorted CDX file. + The file is compressed as a series of GZIP members, each member containing + a fixed number of lines, like 3000. + + +CLUSTER: + an HDFS directory containing a set of shards, each alphabetically contiguous: + part-#####.gz - SHARDS + manifest.txt - file containing all BASENAMEs represented in this cluster + ALL.summary - file containing "URL SHARD OFFSET LENGTH" for each + +clusters directory: + a single HDFS directory containing one or more CLUSTERs. + + + INCLUDED.txt - list of all files that are currently in the index + STAGED.txt - list of all files which have CDX.gz files created in HDFS + SOURCE.txt - list of all files known to the indexing system + +SOURCE.txt: + + BASENAME FILE_URL CDX_URL ... + +STAGED.txt: + BASENAME HDFS_CDX_URL [OFFSET LENGTH] + +INCLUDED.txt: + BASENAME + + +SOURCE is computed outside the framework, based on installation scripts. + +INCLUDED.txt is computed from existed SORTED clusters. \ No newline at end of file Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputFormat.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputFormat.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputFormat.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,173 @@ +package org.archive.hadoop.io; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.archive.hadoop.cdx.CDXCluster; +import org.archive.hadoop.cdx.SplitFile; +import org.archive.hadoop.util.PartitionName; +import org.archive.util.iterator.CloseableIteratorUtil; +import org.archive.util.iterator.SortedCompositeIterator; + +public class MergeClusterRangesInputFormat extends InputFormat<Long, Text> { + + private static final Logger LOG = + Logger.getLogger(MergeClusterRangesInputFormat.class.getName()); + + private static final String SPLIT_CONFIG_KEY = "merge.cluster.split.path"; + private static final String MERGE_CLUSTER_PATH_CONFIG_KEY = "merge.cluster.paths"; + + public static void setSplitPath(Configuration conf, String path) throws IOException { + conf.set(SPLIT_CONFIG_KEY, path); + LOG.warning(String.format("Setting Split path: %s",path)); + SplitFile splitFile = new SplitFile(); + Reader r = getSplitReader(conf); + splitFile.read(r); + for(int i = 0; i < splitFile.size(); i++) { + PartitionName.setPartitionOutputName(conf, i, splitFile.getName(i)); + } + r.close(); + } + + private static Reader getSplitReader(Configuration conf) throws IOException { + String pathString = getSplitPath(conf); + LOG.warning(String.format("Got Split path: %s",pathString)); + Path splitPath = new Path(pathString); + FileSystem fs = FileSystem.get(splitPath.toUri(), conf); + FSDataInputStream fsdis = fs.open(splitPath); + return new InputStreamReader(fsdis, Charset.forName("UTF-8")); + + } + private static String getSplitPath(Configuration conf) { + return conf.get(SPLIT_CONFIG_KEY); + } + + public static void setClusterPaths(Configuration conf, String[] paths) { + conf.setStrings(MERGE_CLUSTER_PATH_CONFIG_KEY, paths); + } + private static String[] getClusterPaths(Configuration conf) { + return conf.getStrings(MERGE_CLUSTER_PATH_CONFIG_KEY); + } + + @Override + public RecordReader<Long, Text> createRecordReader(InputSplit split, + TaskAttemptContext arg1) throws IOException, InterruptedException { + // TODO!! + return new MergeClusterRangesRecordReader(); + } + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); + Reader r = getSplitReader(conf); + SplitFile splitFile = new SplitFile(); + splitFile.read(r); + r.close(); + + ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); + for(int i = 0; i < splitFile.size(); i++) { + MergeClusterRangesInputSplit split = + new MergeClusterRangesInputSplit(splitFile.size() - i, + splitFile.getStart(i), + splitFile.getEnd(i), + getClusterPaths(conf)); + splits.add(split); + LOG.warning(String.format("Added split(%d) (%s)-(%s)", + splitFile.size() - i,splitFile.getStart(i),splitFile.getEnd(i))); + } + + return splits; + + } + + public class MergeClusterRangesRecordReader extends RecordReader<Long, Text> { + Iterator<String> itr; + Long key = null; + Text value = null; + + @Override + public void close() throws IOException { + CloseableIteratorUtil.attemptClose(itr); + } + + @Override + public Long getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // TODO ... + return 0; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + MergeClusterRangesInputSplit mSplit = + (MergeClusterRangesInputSplit) split; + + SortedCompositeIterator<String> itrS = + new SortedCompositeIterator<String>(new Comparator<String>() { + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }); + String start = mSplit.getStart(); + String end = mSplit.getEnd(); + + for(String clusterPath : mSplit.getClusterPaths()) { + LOG.warning(String.format("Added range(%d) (%s)-(%s): %s", + context.getTaskAttemptID().getId(),start,end,clusterPath)); + CDXCluster cluster = new CDXCluster(conf, new Path(clusterPath)); + itrS.addIterator(cluster.getRange(start, end)); + } + // TODO: filtering on SURTs: + + itr = itrS; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if(key == null) { + key = new Long(0); + } + if(value == null) { + value = new Text(); + } + if(itr.hasNext()) { + key = new Long(key.longValue() + 1); + value.set(itr.next()); + return true; + } + return false; + } + + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputSplit.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputSplit.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/io/MergeClusterRangesInputSplit.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,87 @@ +package org.archive.hadoop.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +public class MergeClusterRangesInputSplit extends InputSplit implements Writable { + + private long length; + private String start; + private String end; + private String[] clusterPaths; + + // punting - likely the inputs will be spread all over + private static final String[] locations = new String[0]; + + public MergeClusterRangesInputSplit() {} + public MergeClusterRangesInputSplit(long length, String start, String end, + String[] clusterPaths) { + this.length = length; + this.start = start; + this.end = end; + this.clusterPaths = clusterPaths; +// this.locations = new String[0]; + } + + public void write(DataOutput out) throws IOException { + out.writeLong(length); + out.writeUTF(start); + out.writeUTF(end); + out.writeInt(clusterPaths.length); + for(String p : clusterPaths) { + out.writeUTF(p); + } +// out.writeInt(locations.length); +// for(String l : locations) { +// out.writeUTF(l); +// } + } + + public void readFields(DataInput in) throws IOException { + length = in.readLong(); + start = in.readUTF(); + end = in.readUTF(); + int cl = in.readInt(); + clusterPaths = new String[cl]; + for(int i = 0; i < cl; i++) { + clusterPaths[i] = in.readUTF(); + } +// int ll = in.readInt(); +// locations = new String[ll]; +// for(int i = 0; i < ll; i++) { +// locations[i] = in.readUTF(); +// } + } + + @Override + public long getLength() throws IOException, InterruptedException { + return length; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return locations; + } + /** + * @return the start + */ + public String getStart() { + return start; + } + /** + * @return the end + */ + public String getEnd() { + return end; + } + /** + * @return the clusterPaths + */ + public String[] getClusterPaths() { + return clusterPaths; + } +} Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/BuildCluster.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/BuildCluster.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/BuildCluster.java 2011-11-18 23:18:42 UTC (rev 3567) @@ -0,0 +1,228 @@ +package org.archive.hadoop.jobs; + +im... [truncated message content] |