From: <bra...@us...> - 2011-12-18 00:07:57
|
Revision: 3575 http://archive-access.svn.sourceforge.net/archive-access/?rev=3575&view=rev Author: bradtofel Date: 2011-12-18 00:07:50 +0000 (Sun, 18 Dec 2011) Log Message: ----------- Added Logging to convert problems - adding Robot Meta instructions field, swapping compressed length/offset fields 9,10 (now offset is 10th) and we now output a (somewhat more) correct CDX header line, with the new "S" column for compressed size. Modified Paths: -------------- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/CDXMapper.java Modified: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java 2011-11-29 06:03:59 UTC (rev 3574) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/cdx/CDXConverterTool.java 2011-12-18 00:07:50 UTC (rev 3575) @@ -39,10 +39,12 @@ break; } StringPair pair = mapper.convert(cdxLine); - pw.print(pair.first); - pw.print(" "); - pw.print(pair.second); - pw.println(); + if(pair != null) { + pw.print(pair.first); + pw.print(" "); + pw.print(pair.second); + pw.println(); + } } pw.flush(); return 0; Modified: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/CDXMapper.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/CDXMapper.java 2011-11-29 06:03:59 UTC (rev 3574) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/CDXMapper.java 2011-12-18 00:07:50 UTC (rev 3575) @@ -1,6 +1,7 @@ package org.archive.hadoop.mapreduce; import java.io.IOException; +import java.util.logging.Logger; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -11,7 +12,12 @@ public class CDXMapper extends Mapper<Object, Text, Text, Text> implements Configurable { - + private static final Logger LOG = + Logger.getLogger(CDXMapper.class.getName()); + + // Note the (unbelievably) new "S" for "Size in Compressed Bytes..." + public final static String NEW_CDX_HEADER = + "CDX N b a m s k r M S V g"; private static String TEXT_OUTPUT_DELIM_CONFIG = "text.output.delim"; public static int MODE_GLOBAL = 0; public static int MODE_FULL = 1; @@ -30,8 +36,7 @@ public StringPair convert(String cdxLine) { if(cdxLine.startsWith(" CDX ")) { - return new StringPair("", cdxLine.substring(1)); -// return null + return new StringPair("", NEW_CDX_HEADER); } String[] parts = cdxLine.split(delim); int offsetIdx = 8; @@ -45,6 +50,9 @@ return null; } } + } else { + LOG.warning("Skipping line:" + cdxLine); + return null; } // don't care about the old key: @@ -68,8 +76,9 @@ valSB.append(responseCode).append(delim); valSB.append(digest).append(delim); valSB.append(redirect).append(delim); + valSB.append(metaInstructions).append(delim); + valSB.append(DEFAULT_GZ_LEN).append(delim); valSB.append(offset).append(delim); - valSB.append(DEFAULT_GZ_LEN).append(delim); valSB.append(filename); return new StringPair(keySB.toString(), valSB.toString()); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <aa...@us...> - 2012-02-07 19:37:05
|
Revision: 3612 http://archive-access.svn.sourceforge.net/archive-access/?rev=3612&view=rev Author: aalsum Date: 2012-02-07 19:36:59 +0000 (Tue, 07 Feb 2012) Log Message: ----------- Add WATExtractor Job to extract WAT files on Hadoop. Modified Paths: -------------- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/JobDriver.java Added Paths: ----------- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/WATExtractorJob.java trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/WATExtractorMapper.java Modified: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/JobDriver.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/JobDriver.java 2012-02-02 17:45:12 UTC (rev 3611) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/JobDriver.java 2012-02-07 19:36:59 UTC (rev 3612) @@ -99,6 +99,14 @@ GZRangeClientTool.class, GZRangeClientTool.TOOL_DESCRIPTION); + pgd.addClass(GZRangeClientTool.TOOL_NAME, + GZRangeClientTool.class, + GZRangeClientTool.TOOL_DESCRIPTION); + + pgd.addClass(WATExtractorJob.TOOL_NAME, + WATExtractorJob.class, + WATExtractorJob.TOOL_DESCRIPTION); + pgd.driver(args); exitCode = 0; Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/WATExtractorJob.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/WATExtractorJob.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/WATExtractorJob.java 2012-02-07 19:36:59 UTC (rev 3612) @@ -0,0 +1,117 @@ +package org.archive.hadoop.jobs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.archive.hadoop.mapreduce.WATExtractorMapper; + +public class WATExtractorJob extends Configured implements Tool { + Configuration conf = null; + + public final static String TOOL_NAME = "WATExtractor"; + public final static String TOOL_DESCRIPTION = "A map/reduce program that extract a bunch of WARC files into WAT files into HDFS."; + + public final static String WAT_EXTRACT_TARGET = "wat-extractor.target"; + public final static String WAT_EXTRACTOR_OVERRIDE = "wat-extractor.override"; + + public Configuration getConf() { + + return conf; + } + + + static int printUsage() { + System.out.println("WATExtractor [OPTIONS] <input> <outputdir> <importTarget>"); + System.out.println("\tOPTIONS can be:"); + System.out.println("\t\t-m NUM - try to run with approximately NUM map tasks"); + System.out.println("\t\t--override - to override existent WAT files with the same name, the default is to skip the extracted files."); + System.out.println("\tThe input file contains lines of the form:"); + System.out.println("\t\t\tFilePath"); + System.out.println("\tOR"); + System.out.println("\t\t\tBASENAME<SPACE>FilePath"); + System.out.println("\tif only FilePath is specified, then the target will be <importTarget>/<BASENAME of FilePath>"); + System.out.println("\totherwise the target will be <importTarget>/<BASENAME>"); + System.err.println("\tFilePath is HTTP or HDFS URL to an arc, warc, arc.gz, or warc.gz."); + System.out.println(); + return -1; + } + public void setConf(Configuration conf) { + this.conf = conf; + + } + + @Override + public int run(String[] args) throws Exception { + + Job job = new Job(getConf(), "wat-extractor"); + Configuration conf = job.getConfiguration(); + job.setJarByClass(WATExtractorJob.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(WATExtractorMapper.class); + + int i = 0; + int numMaps = 10; + while(i < args.length -1) { + if(args[i].equals("-m")) { + i++; + numMaps = Integer.parseInt(args[i]); + i++; + } else if(args[i].equals("--override")) { + WATExtractorMapper.setOverride(conf, true); + i++; + + } else { + break; + } + } + if(args.length - 3 != i) { + printUsage(); +// throw new IllegalArgumentException("wrong number of args..."); + } + Path inputPath = new Path(args[i]); + Path outputPath = new Path(args[i+1]); + Path targetPath = new Path(args[i+2]); + + TextInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + WATExtractorMapper.setTargetDir(conf, targetPath.toString()); + + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + + FileSystem fs = inputPath.getFileSystem(conf); + FileStatus inputStatus = fs.getFileStatus(inputPath); + long inputLen = inputStatus.getLen(); + long bytesPerMap = (int) inputLen / numMaps; + + FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); + + + return (job.waitForCompletion(true) ? 0 : 1); + + + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WATExtractorJob(), args); + System.exit(res); + + } + +} Property changes on: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/jobs/WATExtractorJob.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/WATExtractorMapper.java =================================================================== --- trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/WATExtractorMapper.java (rev 0) +++ trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/WATExtractorMapper.java 2012-02-07 19:36:59 UTC (rev 3612) @@ -0,0 +1,149 @@ +package org.archive.hadoop.mapreduce; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URL; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.archive.extract.ExtractingResourceFactoryMapper; +import org.archive.extract.ExtractingResourceProducer; +import org.archive.extract.ExtractorOutput; +import org.archive.extract.ProducerUtils; +import org.archive.extract.ResourceFactoryMapper; +import org.archive.extract.WATExtractorOutput; +import org.archive.hadoop.jobs.WATExtractorJob; +import org.archive.resource.Resource; +import org.archive.resource.ResourceProducer; +import org.archive.util.StringFieldExtractor; +import org.archive.util.StringFieldExtractor.StringTuple; + +public class WATExtractorMapper extends + Mapper<Object, Text, Text, Text> { + + + public final static String WAT_EXTRACTOR_TARGET = "wat-extractor.target"; + public final static String WAT_EXTRACTOR_OVERRIDE = "wat-extractor.override"; + Path target = null; + FileSystem filesystem = null; + boolean overrideExistentFile = false; + StringFieldExtractor sfe = new StringFieldExtractor(' ', 1); + + public static void setTargetDir(Configuration conf, String path) { + conf.set(WAT_EXTRACTOR_TARGET, path); + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + String targetString = conf.get(WATExtractorJob.WAT_EXTRACT_TARGET); + + overrideExistentFile = conf.getBoolean(WAT_EXTRACTOR_OVERRIDE, false); + target = new Path(targetString); + filesystem = target.getFileSystem(conf); + } + + public void map(Object y, Text value, Context context) + throws IOException, InterruptedException { + + //PArse the URL files + String valueS = value.toString(); + String name; + String url = valueS; + int idx = valueS.indexOf(' '); + if(idx == -1) { + URL tmpUrl = new URL(valueS); + name = tmpUrl.getPath(); + if(name.contains("/")) { + name = name.substring(name.lastIndexOf('/')+1); + } + } else { + StringTuple t = sfe.split(valueS); + if((t.first == null) || (t.second == null)) { + throw new IOException("Bad input line:" + valueS); + } + name = t.first; + url = t.second; + } + + Path thisTarget = new Path(target,name); + Path thisTargetTmp = new Path(target,name+".wat.gz"); + doExtract(url, thisTarget,thisTargetTmp); + + } + + private void doExtract(String url, Path target, Path targetTmp) throws IOException { + // Check if the target exists (from previous map) + long targetLen = getPathLength(target); + + int max = Integer.MAX_VALUE; + + if(targetLen > -1) { + // there's a file in the filesystem already, + + if(overrideExistentFile){ + + if(!filesystem.delete(target, false)) { + throw new IOException("Failed to delete old copy"); + } + } else { + return; + } + } + + FSDataOutputStream fsdOut = filesystem.create(targetTmp, false); + ExtractorOutput out; + out = new WATExtractorOutput(fsdOut); + + ResourceProducer producer = ProducerUtils.getProducer(url); + + ResourceFactoryMapper mapper = new ExtractingResourceFactoryMapper(); + ExtractingResourceProducer exProducer = + new ExtractingResourceProducer(producer, mapper); + + Logger.getLogger("org.archive").setLevel(Level.WARNING); + + int count = 0; + int incr = 1; + while(count < max) { + try { + Resource r = exProducer.getNext(); + if(r == null) { + break; + } + count += incr; + + out.output(r); + } catch(Exception e){ + e.printStackTrace(); + + } + } + } + + + private long getPathLength(Path path) throws IOException { + FileStatus stat = null; + try { + stat = filesystem.getFileStatus(path); + // present.. check by size: + } catch (FileNotFoundException e) { + return -1; + } + return stat.getLen(); + } + + public static void setOverride(Configuration conf, boolean isOverride) { + conf.setBoolean(WAT_EXTRACTOR_OVERRIDE, isOverride); + + } +} Property changes on: trunk/archive-access/projects/ia-tools/src/main/java/org/archive/hadoop/mapreduce/WATExtractorMapper.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |