From: Gordon M. <go...@ar...> - 2011-02-18 01:27:45
|
Re: MultiMemberGZIPInputStream As of a couple days ago, Heritrix TRUNK has a GZIPMembersInputStream, that offers not just reads across member-boundaries, but... • behaves the same pre and post JDK6u23! • gives calling-code option of stop-on-member-end or read-straight-thru! • reports member start,end offsets in compressed stream! • offers skip/seek in compressed-offsets! ...and that's not all! It's also got: • comments! • tests! • general compliance with our coding standards (like braces around 1-line if/then/else clauses)! Check it out. - Gordon On 2/17/11 2:35 PM, bra...@us... wrote: > Revision: 3417 > http://archive-access.svn.sourceforge.net/archive-access/?rev=3417&view=rev > Author: bradtofel > Date: 2011-02-17 22:35:47 +0000 (Thu, 17 Feb 2011) > > Log Message: > ----------- > Initial checkin of early gzip line dereferencing CDX processing code. > > Modified Paths: > -------------- > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java > > Added Paths: > ----------- > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -100,7 +100,9 @@ > private void mapFull(Object y, Text value, Context context) > throws IOException, InterruptedException { > String s = value.toString(); > - > + if(s.startsWith(" CDX ")) { > + return; > + } > boolean problems = true; > i1 = s.indexOf(delim); > if(i1> 0) { > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -69,6 +69,10 @@ > System.out.println("cdxsort [OPTIONS]<split> <input> <output>"); > System.out.println("\tOPTIONS can be:"); > System.out.println("\t\t-m NUM - try to run with approximately NUM map tasks"); > + System.out.println("\t\t--compressed-input - assume input is compressed, even without .gz suffix"); > + System.out.println("\t\t--gzip-range - assume input lines are PATH START LENGTH such that a"); > + System.out.println("\t\t\t valid gzip record exists in PATH between START and START+LENGTH"); > + System.out.println("\t\t\t that contains the records to process"); > System.out.println("\t\t--compress-output - compress output files with GZip"); > System.out.println("\t\t--delimiter DELIM - assume DELIM delimter for input and output, instead of default<SPACE>"); > System.out.println("\t\t--map-global - use the GLOBAL CDX map function, which implies:"); > @@ -93,6 +97,8 @@ > > long desiredMaps = 10; > boolean compressOutput = false; > + boolean compressedInput = false; > + boolean gzipRange = false; > List<String> otherArgs = new ArrayList<String>(); > int mapMode = CDXCanonicalizingMapper.MODE_FULL; > for (int i = 0; i< args.length; ++i) { > @@ -101,6 +107,10 @@ > desiredMaps = Integer.parseInt(args[++i]); > } else if ("--compress-output".equals(args[i])) { > compressOutput = true; > + } else if ("--compressed-input".equals(args[i])) { > + compressedInput = true; > + } else if ("--gzip-range".equals(args[i])) { > + gzipRange = true; > } else if ("--delimiter".equals(args[i])) { > delim = args[++i]; > } else if ("--map-full".equals(args[i])) { > @@ -175,8 +185,14 @@ > > FileInputFormat.addInputPath(job, inputPath); > FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); > - job.setInputFormatClass(LineDereferencingInputFormat.class); > - > + if(gzipRange) { > + job.setInputFormatClass(GZIPRangeLineDereferencingInputFormat.class); > + } else { > + job.setInputFormatClass(LineDereferencingInputFormat.class); > + if(compressedInput) { > + LineDereferencingRecordReader.forceCompressed(conf); > + } > + } > FileOutputFormat.setOutputPath(job, outputPath); > > return (job.waitForCompletion(true) ? 0 : 1); > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,17 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.IOException; > + > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapreduce.InputSplit; > +import org.apache.hadoop.mapreduce.RecordReader; > +import org.apache.hadoop.mapreduce.TaskAttemptContext; > + > +public class GZIPRangeLineDereferencingInputFormat extends LineDereferencingInputFormat { > + @Override > + public RecordReader<Text, Text> createRecordReader(InputSplit split, > + TaskAttemptContext context) throws IOException, > + InterruptedException { > + return new GZIPRangeLineDereferencingRecordReader(); > + } > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,85 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.BufferedReader; > +import java.io.ByteArrayInputStream; > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.InputStreamReader; > +import java.util.zip.GZIPInputStream; > + > +import org.apache.hadoop.fs.FSDataInputStream; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.Text; > + > +public class GZIPRangeLineDereferencingRecordReader extends LineDereferencingRecordReader{ > + String curInputLine = null; > + FSDataInputStream fsdis = null; > + long curStart = 0; > + byte[] buffer = null; > + @Override > + public boolean nextKeyValue() throws IOException, InterruptedException { > + if(key == null) { > + key = new Text(); > + } > + if(value == null) { > + value = new Text(); > + } > + while(true) { > + if(curReader == null) { > + // are there more? > + if(internal.nextKeyValue()) { > + progress = internal.getProgress(); > + curInputLine = internal.getCurrentValue().toString(); > + String[] parts = curInputLine.split(" "); > + if(parts.length != 3) { > + throw new IOException("Bad format line(" + curInputLine +")"); > + } > + String newFile = parts[0]; > + if(fsdis != null) { > + if(!newFile.equals(curFile)) { > + // close old and open new, otherwise we can just > + // do another read on the current one: > + fsdis.close(); > + curFile = newFile; > + Path path = new Path(curFile); > + fsdis = fileSystem.open(path); > + } > + } else { > + curFile = newFile; > + Path path = new Path(curFile); > + fsdis = fileSystem.open(path); > + } > + curFile = parts[0]; > + curStart = Long.parseLong(parts[1]); > + int length = Integer.parseInt(parts[2]); > + if(buffer == null) { > + buffer = new byte[length]; > + } else if (buffer.length< length) { > + buffer = new byte[length]; > + } > + fsdis.read(curStart,buffer,0,length); > + // the whole chunk is now in buffer: > + InputStream is = > + new GZIPInputStream(new ByteArrayInputStream(buffer,0,length)); > + curReader = new BufferedReader(new InputStreamReader(is)); > + curLine = 0; > + > + } else { > + // all done: > + return false; > + } > + } > + // try to read another line: > + String nextLine = curReader.readLine(); > + if(nextLine != null) { > + key.set(curFile+":"+curStart+":"+curLine); > + value.set(nextLine); > + curLine++; > + return true; > + } > + curReader.close(); > + curReader = null; > + } > + } > + > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,81 @@ > +package org.archive.wayback.hadoop; > + > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.conf.Configured; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapred.MapRunner; > +import org.apache.hadoop.mapreduce.Job; > +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > +import org.apache.hadoop.util.Tool; > +import org.apache.hadoop.util.ToolRunner; > + > +public class HTTPImportJob extends Configured implements Tool { > + Configuration conf = null; > + public final static String HTTP_IMPORT_TARGET = "http-import.target"; > + public Configuration getConf() { > + return conf; > + } > + > + public void setConf(Configuration conf) { > + this.conf = conf; > + > + } > + > + > + > + public int run(String[] args) throws Exception { > + Job job = new Job(getConf(), "http-import"); > + Configuration conf = job.getConfiguration(); > + job.setJarByClass(HTTPImportJob.class); > + job.setInputFormatClass(TextInputFormat.class); > + job.setOutputFormatClass(TextOutputFormat.class); > + job.setOutputKeyClass(Text.class); > + job.setOutputValueClass(Text.class); > + job.setMapperClass(HTTPImportMapper.class); > + > + int i = 0; > + int numMaps = 10; > + while(i< args.length -1) { > + if(args[i].equals("-m")) { > + i++; > + numMaps = Integer.parseInt(args[i]); > + i++; > + } else { > + break; > + } > + } > + if(args.length - 3 != i) { > + throw new IllegalArgumentException("wrong number of args..."); > + } > + Path inputPath = new Path(args[i]); > + Path outputPath = new Path(args[i+1]); > + Path targetPath = new Path(args[i+2]); > + > + TextInputFormat.addInputPath(job, inputPath); > + FileOutputFormat.setOutputPath(job, outputPath); > + conf.set(HTTP_IMPORT_TARGET, targetPath.toString()); > + > + conf.setBoolean("mapred.map.tasks.speculative.execution", false); > + > + FileSystem fs = inputPath.getFileSystem(conf); > + FileStatus inputStatus = fs.getFileStatus(inputPath); > + long inputLen = inputStatus.getLen(); > + long bytesPerMap = (int) inputLen / numMaps; > + > + FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); > + > + > + return (job.waitForCompletion(true) ? 0 : 1); > + } > + public static void main(String[] args) throws Exception { > + int res = ToolRunner.run(new Configuration(), new HTTPImportJob(), args); > + System.exit(res); > + } > + > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,145 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.FileNotFoundException; > +import java.io.IOException; > +import java.io.InputStream; > +import java.net.URL; > + > +import org.apache.commons.httpclient.HttpClient; > +import org.apache.commons.httpclient.methods.GetMethod; > +import org.apache.commons.httpclient.methods.HeadMethod; > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.fs.FSDataOutputStream; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.LongWritable; > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapreduce.Mapper; > +import org.apache.hadoop.mapreduce.Mapper.Context; > + > +public class HTTPImportMapper extends Mapper<LongWritable, Text, Text, Text> { > + public final int BUFSIZ = 4096; > + Path target = null; > + FileSystem filesystem = null; > + Text doneText = null; > + HttpClient client = null; > + public HTTPImportMapper() { > + > + } > + public void init2() { > + System.err.println("Init map..."); > + } > + @Override > + protected void setup(Context context) throws IOException, > + InterruptedException { > + super.setup(context); > + Configuration conf = context.getConfiguration(); > + String targetString = conf.get(HTTPImportJob.HTTP_IMPORT_TARGET); > + if(targetString == null) { > + throw new IOException("No " + HTTPImportJob.HTTP_IMPORT_TARGET > + + " specified"); > + } > + target = new Path(targetString); > + filesystem = target.getFileSystem(conf); > + doneText = new Text("Done"); > + client = new HttpClient(); > + } > + > + @Override > + protected void map(LongWritable key, Text value, Context context) > + throws IOException, InterruptedException { > + > + String valueS = value.toString(); > + String name; > + String url = valueS; > + int idx = valueS.indexOf(' '); > + if(idx == -1) { > + URL tmpUrl = new URL(valueS); > + name = tmpUrl.getPath(); > + if(name.contains("/")) { > + name = name.substring(name.lastIndexOf('/')+1); > + } > + } else { > + name = valueS.substring(0,idx); > + url = valueS.substring(idx+1); > + } > + Path thisTarget = new Path(target,name); > + doCopy(url, thisTarget); > + context.write(value, doneText); > + } > + > + private long getURLLengthByHead(String url) throws IOException { > + HeadMethod head = new HeadMethod(url); > + long urlLen = -1; > + // execute the method and handle any error responses. > + try { > + int code = client.executeMethod(head); > + if(code != 200) { > + throw new IOException("Non-200 for HEAD:" + url); > + } > + urlLen = head.getResponseContentLength(); > + // discard: hope it's really empty (HEAD) and thus small... > + head.getResponseBody(); > + } finally { > + head.releaseConnection(); > + } > + return urlLen; > + } > + > + private long getPathLength(Path path) throws IOException { > + FileStatus stat = null; > + try { > + stat = filesystem.getFileStatus(path); > + // present.. check by size: > + } catch (FileNotFoundException e) { > + return -1; > + } > + return stat.getLen(); > + } > + > + > + private void doCopy(String url, Path target) throws IOException { > + // Check if the target exists (from previous map) > + long targetLen = getPathLength(target); > + long urlLen = -1; > + if(targetLen> -1) { > + // there's a file in the filesystem already, see if it's the > + // same length: > + urlLen = getURLLengthByHead(url); > + if(urlLen == targetLen) { > + // same size, assume it's done: > + return; > + } > + // diff length, do copy again, first remove old: > + if(!filesystem.delete(target, false)) { > + throw new IOException("Failed to delete old copy"); > + } > + } > + // do the copy: > + FSDataOutputStream out = filesystem.create(target, false); > + GetMethod get = new GetMethod(url); > + long copied = 0; > + try { > + int code = client.executeMethod(get); > + if(code != 200) { > + throw new IOException("Non 200 on GET: " + url); > + } > + urlLen = get.getResponseContentLength(); > + InputStream in = get.getResponseBodyAsStream(); > + byte buffer[] = new byte[BUFSIZ]; > + for(int cbread; (cbread = in.read(buffer))>= 0; ) { > + out.write(buffer, 0, cbread); > + copied += cbread; > + } > + } finally { > + get.releaseConnection(); > + out.close(); > + } > + if(copied != urlLen) { > + // ack.. what went wrong? > + throw new IOException("Wrong copy length want(" + urlLen > + + ") got(" + copied + ") URL:" + url); > + } > + } > +} > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -25,6 +25,7 @@ > import java.io.InputStreamReader; > import java.util.zip.GZIPInputStream; > > +import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.Text; > @@ -45,6 +46,9 @@ > public class LineDereferencingRecordReader extends RecordReader<Text, Text>{ > LineRecordReader internal = new LineRecordReader(); > > + > + protected static final String FORCE_COMPRESSED_FLAG = "line-reref.force-compressed"; > + > FileSystem fileSystem = null; > Text key = null; > Text value = null; > @@ -52,11 +56,18 @@ > String curFile = null; > long curLine = 0; > float progress = 0.0f; > + boolean forceCompressed = false; > + public static void forceCompressed(Configuration conf) { > + conf.setBoolean(FORCE_COMPRESSED_FLAG, true); > + } > + > @Override > public void initialize(InputSplit split, TaskAttemptContext context) > throws IOException, InterruptedException { > + Configuration conf = context.getConfiguration(); > + forceCompressed = conf.getBoolean(FORCE_COMPRESSED_FLAG, false); > FileSplit fileSplit = (FileSplit) split; > - fileSystem = fileSplit.getPath().getFileSystem(context.getConfiguration()); > + fileSystem = fileSplit.getPath().getFileSystem(conf); > internal.initialize(split, context); > } > > @@ -77,8 +88,9 @@ > Path path = new Path(curFile); > InputStream is = fileSystem.open(path); > // TODO: use the real Codec stuff.. > - if(curFile.endsWith(".gz")) { > - is = new GZIPInputStream(is); > + if(forceCompressed || curFile.endsWith(".gz")) { > +// is = new GZIPInputStream(is); > + is = new MultiMemberGZIPInputStream(is); > } > curReader = new BufferedReader(new InputStreamReader(is)); > > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,96 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.InputStream; > +import java.io.PushbackInputStream; > +import java.io.IOException; > +import java.util.zip.GZIPInputStream; > + > +public class MultiMemberGZIPInputStream extends GZIPInputStream { > + > + public MultiMemberGZIPInputStream(InputStream in, int size) throws IOException > + { > + // Wrap the stream in a PushbackInputStream... > + super(new PushbackInputStream(in, size), size); > + this.size=size; > + } > + > + public MultiMemberGZIPInputStream(InputStream in) throws IOException > + { > + // Wrap the stream in a PushbackInputStream... > + super(new PushbackInputStream(in, 1024)); > + this.size=-1; > + } > + > + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent) throws IOException > + { > + super(parent.in); > + this.size=-1; > + this.parent=parent.parent==null ? parent : parent.parent; > + this.parent.child=this; > + } > + > + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent, int size) throws IOException > + { > + super(parent.in, size); > + this.size=size; > + this.parent=parent.parent==null ? parent : parent.parent; > + this.parent.child=this; > + } > + > + private MultiMemberGZIPInputStream parent; > + private MultiMemberGZIPInputStream child; > + private int size; > + private boolean eos; > + > + public int read(byte[] inputBuffer, int inputBufferOffset, int inputBufferLen) throws IOException { > + > + if (eos) { return -1;} > + if (this.child!=null) > + return this.child.read(inputBuffer, inputBufferOffset, inputBufferLen); > + > + int charsRead=super.read(inputBuffer, inputBufferOffset, inputBufferLen); > + if (charsRead==-1) > + { > + // Push any remaining buffered data back onto the stream > + // If the stream is then not empty, use it to construct > + // a new instance of this class and delegate this and any > + // future calls to it... > + int n = inf.getRemaining() - 8; > + if (n> 0) > + { > + // More than 8 bytes remaining in deflater > + // First 8 are gzip trailer. Add the rest to > + // any un-read data... > + ((PushbackInputStream)this.in).unread(buf, len-n, n); > + } > + else > + { > + // Nothing in the buffer. We need to know whether or not > + // there is unread data available in the underlying stream > + // since the base class will not handle an empty file. > + // Read a byte to see if there is data and if so, > + // push it back onto the stream... > + byte[] b=new byte[1]; > + int ret=in.read(b,0,1); > + if (ret==-1) > + { > + eos=true; > + return -1; > + } > + else > + ((PushbackInputStream)this.in).unread(b, 0, 1); > + } > + > + MultiMemberGZIPInputStream child; > + if (this.size==-1) > + child=new MultiMemberGZIPInputStream(this); > + else > + child=new MultiMemberGZIPInputStream(this, this.size); > + return child.read(inputBuffer, inputBufferOffset, inputBufferLen); > + } > + else > + return charsRead; > + } > + > +} > + > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -32,6 +32,8 @@ > try { > pgd.addClass("cdxsort", CDXSortDriver.class, > "A map/reduce program that canonicalizes and provides a total order sort into multiple CDX files"); > + pgd.addClass("http-import", HTTPImportJob.class, > + "A map/reduce program that imports a bunch of URLs into an HDFS directory"); > pgd.driver(args); > // Success > exitCode = 0; > > > This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. > > ------------------------------------------------------------------------------ > The ultimate all-in-one performance toolkit: Intel(R) Parallel Studio XE: > Pinpoint memory and threading errors before they happen. > Find and fix more than 250 security defects in the development cycle. > Locate bottlenecks in serial and parallel code that limit performance. > http://p.sf.net/sfu/intel-dev2devfeb > _______________________________________________ > Archive-access-cvs mailing list > Arc...@li... > https://lists.sourceforge.net/lists/listinfo/archive-access-cvs |