From: <bra...@us...> - 2010-09-03 22:31:59
|
Revision: 3245 http://archive-access.svn.sourceforge.net/archive-access/?rev=3245&view=rev Author: bradtofel Date: 2010-09-03 22:31:52 +0000 (Fri, 03 Sep 2010) Log Message: ----------- New version of the Wayback hadoop cdx sorting code, integrated with hadoop 20.2, and more flexible Modified Paths: -------------- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/AlphaPartitioner.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXReducer.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingInputFormat.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/AlphaPartitioner.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/AlphaPartitioner.java 2010-09-03 22:30:36 UTC (rev 3244) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/AlphaPartitioner.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -25,7 +25,6 @@ package org.archive.wayback.hadoop; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; @@ -33,14 +32,12 @@ import java.util.ArrayList; import java.util.Arrays; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.mapreduce.Partitioner; /** * @@ -48,104 +45,65 @@ * @author brad * @version $Date$, $Revision$ */ -public class AlphaPartitioner implements Partitioner<Text, Text> { +public class AlphaPartitioner extends Partitioner<Text, Text> implements + Configurable { + private static String CONFIG_SPLIT_PATH_NAME = "alphapartitioner.path"; private String boundaries[] = new String[0]; - public static final String DEFAULT_PATH = "_split.txt"; - public static final String SPLIT_PATH_NAME = "alpha-partition.txt"; - public static final String SPLIT_CACHE_NAME = "alpha-partition-cache.txt"; - public static final String CACHE_SPLIT_URI_CONFIG = - "alphapartitioner.cachesplituri"; - public static final String CACHE_SPLIT_PATH_CONFIG = - "alphapartitioner.cachesplitpath"; - public static final String CACHE_SPLIT_STAMP_CONFIG = - "alphapartitioner.cachesplitstamp"; - /** - * Called by client prior to launching the job. The File argument is a - * split file, which is to be pushed into the FileSystem, and into the - * DistributedCache from there, for use by the Map tasks. - * @throws URISyntaxException - */ - public static void setPartitionFile(JobConf conf, File f) - throws IOException, URISyntaxException { - - FileSystem fs = FileSystem.get(conf); + Configuration conf; - Path fsSplitPath = new Path(SPLIT_PATH_NAME); - fs.copyFromLocalFile(new Path(f.getAbsolutePath()), fsSplitPath); - - String cacheURIString = SPLIT_PATH_NAME + "#" + SPLIT_CACHE_NAME; - DistributedCache.addCacheFile(new URI(cacheURIString), conf); - - FileStatus fsStat = fs.getFileStatus(fsSplitPath); - String mtime = String.valueOf(fsStat.getModificationTime()); - System.err.println("Files mtime(" + mtime + ")"); - conf.set(AlphaPartitioner.CACHE_SPLIT_URI_CONFIG,cacheURIString); - conf.set(AlphaPartitioner.CACHE_SPLIT_PATH_CONFIG,SPLIT_CACHE_NAME); - conf.set(AlphaPartitioner.CACHE_SPLIT_STAMP_CONFIG,mtime); + @Override + public int getPartition(Text key, Text value, int numPartitions) { + String keyS = key.toString(); + int loc = Arrays.binarySearch(boundaries, keyS); + if (loc < 0) { + loc = (loc * -1) - 2; + if (loc < 0) { + loc = 0; + } + } + return loc; } - public static void setPartitionFileBad(JobConf conf, File f) - throws IOException, URISyntaxException { - - FileSystem fs = FileSystem.get(conf); - - Path fsSplitPath = new Path(SPLIT_PATH_NAME); - fs.copyFromLocalFile(new Path(f.getAbsolutePath()), fsSplitPath); - - String cacheURIString = SPLIT_PATH_NAME + "#" + SPLIT_CACHE_NAME; - DistributedCache.addCacheFile(new URI(cacheURIString), conf); - - FileStatus fsStat = fs.getFileStatus(fsSplitPath); - String mtime = String.valueOf(fsStat.getModificationTime()); - System.err.println("Files mtime(" + mtime + ")"); - conf.set(AlphaPartitioner.CACHE_SPLIT_URI_CONFIG,cacheURIString); - conf.set(AlphaPartitioner.CACHE_SPLIT_PATH_CONFIG,SPLIT_CACHE_NAME); - conf.set(AlphaPartitioner.CACHE_SPLIT_STAMP_CONFIG,mtime); - } - - /** - * Get a BufferedReader on the alphabetic split file stored in the - * DistributedCache - * @throws IOException - * @throws URISyntaxException - */ - private static BufferedReader getPartitionFile(JobConf conf) - throws IOException, URISyntaxException { - - System.err.println("Loading split partition file..."); - FileSystem fs = FileSystem.getLocal(conf); - Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf); - - -// System.err.println("Local FS:"+fs.toString()); -// URI cacheURI = new URI(conf.get(CACHE_SPLIT_URI_CONFIG)); -// System.err.println("CacheURI: " + cacheURI.toString()); -// long mtime = Long.valueOf(conf.get(CACHE_SPLIT_STAMP_CONFIG)); -// System.err.println("Cache split timestamp: " + mtime); -// Path localSplitPath = DistributedCache.getLocalCache(cacheURI, conf, -// conf.getLocalPath(conf.getJobLocalDir()), false, mtime, -// conf.getWorkingDirectory()); -// System.err.println("LocalSplitPath: " + localSplitPath.toString()); -// FSDataInputStream in = fs.open(localSplitPath); - FSDataInputStream in = fs.open(cacheFiles[0]); - InputStreamReader is = new InputStreamReader(in); - return new BufferedReader(is); + public Configuration getConf() { + return conf; } - public void configure(JobConf conf) { + public void setConf(Configuration conf) { + this.conf = conf; + String partitionPath = getPartitionPath(conf); + String numReduceTasks = conf.get("mapred.reduce.tasks"); + System.err.println("Num configured reduce tasks:" + numReduceTasks); try { - System.err.println("Loading split file from cache..."); - loadBoundaries(getPartitionFile(conf)); - System.err.println("Loaded and Sorted split file"); + URI uri = new URI(partitionPath); + FileSystem fs = FileSystem.get(uri, conf); + Path p = new Path(partitionPath); + loadBoundaries(new BufferedReader(new InputStreamReader(fs.open(p)))); } catch (IOException e) { - throw new RuntimeException(e); + // TODO: ugh. how to handle? + e.printStackTrace(); } catch (URISyntaxException e) { - throw new RuntimeException(e); + e.printStackTrace(); } } - public void loadBoundaries(BufferedReader bis) throws IOException { + /** + * @param conf Configuration for the Job + * @param path hdfs:// URI pointing to the split file + */ + public static void setPartitionPath(Configuration conf, String path) { + conf.set(CONFIG_SPLIT_PATH_NAME, path); + } + + /** + * @param conf Configuration for the Job + * @return the hdfs:// URI for the split file configured for this job + */ + public static String getPartitionPath(Configuration conf) { + return conf.get(CONFIG_SPLIT_PATH_NAME); + } + + private void loadBoundaries(BufferedReader bis) throws IOException { ArrayList<String> l = new ArrayList<String>(); while (true) { String line = bis.readLine(); @@ -157,30 +115,4 @@ boundaries = l.toArray(boundaries); Arrays.sort(boundaries); } - - /** - * @return the number of partitions in the configuration file. This is also - * the number of reduce tasks in the job. - */ - public int getNumPartitions() { - return boundaries.length; - } - - /** - * @param key - * @param value - * @param numReduceTasks - * @return int partition index for key - */ - public int getPartition(Text key, Text value, int numPartitions) { - String keyS = key.toString(); - int loc = Arrays.binarySearch(boundaries, keyS); - if (loc < 0) { - loc = (loc * -1) - 2; - if (loc < 0) { - loc = 0; - } - } - return loc; - } } Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -0,0 +1,152 @@ +package org.archive.wayback.hadoop; + +import java.io.IOException; + +import org.apache.commons.httpclient.URIException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.archive.wayback.util.url.AggressiveUrlCanonicalizer; + +/** + * @author brad + * + */ +public class CDXCanonicalizingMapper extends Mapper<Object, Text, Text, Text> +implements Configurable { + + private static String MODE_CONFIG_NAME = "cdx.map.mode"; + public static int MODE_GLOBAL = 0; + public static int MODE_FULL = 1; + + + private Configuration conf; + private int mode = MODE_GLOBAL; + private Text key = new Text(); + private Text remainder = new Text(); + private String delim = " "; + StringBuilder sb = new StringBuilder(); + + public void map(Object y, Text value, Context context) + throws IOException, InterruptedException { + if(mode == MODE_GLOBAL) { + mapGlobal(y,value,context); + } else { + mapFull(y,value,context); + } + } + + private static int SHA1_DIGITS = 3; + AggressiveUrlCanonicalizer canonicalizer = new AggressiveUrlCanonicalizer(); + private StringBuilder ksb = new StringBuilder(); + private StringBuilder vsb = new StringBuilder(); + private int i1 = 0; + private int i2 = 0; + private int i3 = 0; + private int i4 = 0; + + private void mapGlobal(Object y, Text value, Context context) + throws IOException, InterruptedException { + String s = value.toString(); + + String parts[] = s.split(delim); + if(parts.length == 10) { + if(!parts[9].contains("A")) { + ksb.setLength(0); + vsb.setLength(0); + try { + ksb.append(canonicalizer.urlStringToKey(parts[0])).append(" "); + ksb.append(parts[1]); // date + vsb.append(parts[0]).append(delim); // orig_url + vsb.append(parts[3]).append(delim); // MIME + vsb.append(parts[4]).append(delim); // HTTP_CODE + vsb.append(parts[5].substring(0, SHA1_DIGITS)).append(" "); // SHA1 + vsb.append(parts[6]).append(delim); // redirect + vsb.append(parts[7]).append(delim); // start_offset + vsb.append(parts[8]).append(".arc.gz"); // arc_prefix + key.set(ksb.toString()); + remainder.set(vsb.toString()); + context.write(key, remainder); + } catch (URIException e) { + System.err.println("Failed Canonicalize:("+parts[0]+ + ") in ("+parts[8]+"):("+parts[7]+")"); + } + } + } else { + System.err.println("Funky: Problem with line("+s+")"); + } + + } + private void mapFull(Object y, Text value, Context context) + throws IOException, InterruptedException { + String s = value.toString(); + + boolean problems = true; + i1 = s.indexOf(delim); + if(i1 > 0) { + i2 = s.indexOf(delim, i1 + 1); + if(i2 > 0) { + i3 = s.indexOf(delim, i2 + 1); + if(i3 > 0) { + i4 = s.lastIndexOf(delim); + if(i4 > i3) { + try { + ksb.setLength(0); + ksb.append(canonicalizer.urlStringToKey(s.substring(i2 + 1, i3))); + ksb.append(s.substring(i1,i4)); + key.set(ksb.toString()); + remainder.set(s.substring(i4+1)); + context.write(key, remainder); + problems = false; + } catch(URIException e) { + // just eat it.. problems will be true. + } + } + } + } + } + if(problems) { + System.err.println("CDX-Can: Problem with line("+s+")"); + } + } + +// private void mapOld(Object y, Text value, Context context) +// throws IOException, InterruptedException { +// String parts[] = value.toString().split(delim); +// // lets assume key is field 1-2: +// sb.setLength(0); +// sb.append(parts[0]).append(delim).append(parts[1]); +// key.set(sb.toString()); +// remainder.set(join(delim,parts,2)); +// context.write(key, remainder); +// } +// +// private String join(String delim, String parts[], int start) { +// sb.setLength(0); +// int count = parts.length -1; +// for(int i = start; i < count; i++) { +// sb.append(parts[i]).append(delim); +// } +// sb.append(parts[count]); +// return sb.toString(); +// } + + /** + * @param conf Configuration for the Job + * @param mode String mode to use, one of MODE_GLOBAL, MODE_FULL + */ + public static void setMapMode(Configuration conf, int mode) { + conf.setInt(MODE_CONFIG_NAME, mode); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + mode = conf.getInt(MODE_CONFIG_NAME, MODE_FULL); + delim = conf.get(CDXSortDriver.TEXT_OUTPUT_DELIM_CONFIG,delim); + } +} Property changes on: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java ___________________________________________________________________ Added: svn:keywords + Author Date Revision Id Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXReducer.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXReducer.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXReducer.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -0,0 +1,19 @@ +package org.archive.wayback.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * @author brad + * + */ +public class CDXReducer extends Reducer<Text, Text, Text, Text> { + public void reduce(Text key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + for(Text value : values) { + context.write(key, value); + } + } +} Property changes on: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXReducer.java ___________________________________________________________________ Added: svn:keywords + Author Date Revision Id Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java 2010-09-03 22:30:36 UTC (rev 3244) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -4,12 +4,19 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.zip.GZIPInputStream; +import org.apache.commons.httpclient.URIException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -55,6 +62,7 @@ boolean compressOutput = false; boolean dereferenceInputs = false; boolean canonicalize = false; + boolean funkyInput = false; JobConf jobConf = new JobConf(getConf(), CDXSort.class); jobConf.setJobName("cdxsort"); @@ -73,6 +81,8 @@ jobConf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("--compress-output".equals(args[i])) { compressOutput = true; + } else if ("--funky-input".equals(args[i])) { + funkyInput = true; } else if ("--dereference-inputs".equals(args[i])) { dereferenceInputs = true; } else if ("--canonicalize".equals(args[i])) { @@ -107,32 +117,33 @@ File localSplitFile = new File(splitPath); FileReader is = new FileReader(localSplitFile); BufferedReader bis = new BufferedReader(is); - try { - partitioner.loadBoundaries(bis); - } catch (IOException except) { - System.err.println("ERROR: Problem loading file " + splitPath); - return printUsage(); // exits - } - jobConf.setNumReduceTasks(partitioner.getNumPartitions()); +// try { +// partitioner.loadBoundaries(bis); +// } catch (IOException except) { +// System.err.println("ERROR: Problem loading file " + splitPath); +// return printUsage(); // exits +// } +// jobConf.setNumReduceTasks(partitioner.getNumPartitions()); +// +// // copy the split file into the FS, add to the DistributedCache: +//// AlphaPartitioner.setPartitionFile(jobConf, localSplitFile); +// AlphaPartitioner.setSplitCache(jobConf, localSplitFile); +// System.err.println("uploaded split file to FS and DistributedCache"); +// +// // Set job configs: +// jobConf.setInputFormat(TextInputFormat.class); +// +// jobConf.setOutputFormat(TextOutputFormat.class); +// if (canonicalize) { +// jobConf.setMapperClass(CDXCanonicalizerMapClass.class); +// } else { +// jobConf.setMapperClass(CDXMapClass.class); +// } +// jobConf.setOutputKeyClass(Text.class); +// jobConf.setOutputValueClass(Text.class); +// jobConf.set("mapred.textoutputformat.separator", " "); +// jobConf.setPartitionerClass(AlphaPartitioner.class); - // copy the split file into the FS, add to the DistributedCache: - AlphaPartitioner.setPartitionFile(jobConf, localSplitFile); - System.err.println("uploaded split file to FS and DistributedCache"); - - // Set job configs: - jobConf.setInputFormat(TextInputFormat.class); - - jobConf.setOutputFormat(TextOutputFormat.class); - if (canonicalize) { - jobConf.setMapperClass(CDXCanonicalizerMapClass.class); - } else { - jobConf.setMapperClass(CDXMapClass.class); - } - jobConf.setOutputKeyClass(Text.class); - jobConf.setOutputValueClass(Text.class); - jobConf.set("mapred.textoutputformat.separator", " "); - jobConf.setPartitionerClass(AlphaPartitioner.class); - int inputCount = 0; // Set job input: if (dereferenceInputs) { @@ -150,23 +161,35 @@ // System.err.println("Added path(" + inputCount + "): " + line); // } - FileReader is2 = new FileReader(new File(inputPath)); - BufferedReader bis2 = new BufferedReader(is2); - ArrayList<String> list = new ArrayList<String>(); - while (true) { - String line = bis2.readLine(); - if (line == null) { - break; - } - list.add(line); - inputCount++; + + // PASS 2: +// FileReader is2 = new FileReader(new File(inputPath)); +// BufferedReader bis2 = new BufferedReader(is2); +// ArrayList<String> list = new ArrayList<String>(); +// +// while (true) { +// String line = bis2.readLine(); +// if (line == null) { +// break; +// } +// list.add(line); +// inputCount++; +// } +// Path arr[] = new Path[list.size()]; +// for(int i=0; i < list.size(); i++) { +// arr[i] = new Path(list.get(i)); +// } +// FileInputFormat.setInputPaths(jobConf, arr); + + // PASS 3: + if(funkyInput) { + jobConf.setMapperClass(FunkyDeReffingCDXCanonicalizerMapClass.class); + } else { + jobConf.setMapperClass(DeReffingCDXCanonicalizerMapClass.class); } - Path arr[] = new Path[list.size()]; - for(int i=0; i < list.size(); i++) { - arr[i] = new Path(list.get(i)); - } - FileInputFormat.setInputPaths(jobConf, arr); + FileInputFormat.setInputPaths(jobConf, new Path(inputPath)); + inputCount = 1; } else { @@ -182,10 +205,10 @@ FileOutputFormat.setOutputCompressorClass(jobConf, GzipCodec.class); } - System.out.println("Running on " + cluster.getTaskTrackers() - + " nodes, processing " + inputCount + " files/directories" - + " into " + outputPath + " with " - + partitioner.getNumPartitions() + " reduces."); +// System.out.println("Running on " + cluster.getTaskTrackers() +// + " nodes, processing " + inputCount + " files/directories" +// + " into " + outputPath + " with " +// + partitioner.getNumPartitions() + " reduces."); Date startTime = new Date(); System.out.println("Job started: " + startTime); jobResult = JobClient.runJob(jobConf); @@ -228,7 +251,70 @@ // reporter.setStatus("Running"); } } + public static class FunkyDeReffingCDXCanonicalizerMapClass extends DeReffingCDXCanonicalizerMapClass { + protected Mapper<LongWritable, Text, Text, Text> getInner() { + return new FunkyCDXCanonicalizerMapClass(); + } + } + public static class DeReffingCDXCanonicalizerMapClass extends MapReduceBase + implements Mapper<LongWritable, Text, Text, Text> { + protected Mapper<LongWritable, Text, Text, Text> getInner() { + return new CDXCanonicalizerMapClass(); + } + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + public void map(LongWritable lineNo, Text urlText, + OutputCollector<Text, Text> output, Reporter reporter) + throws IOException { + LongWritable lw = new LongWritable(); + Text tmp = new Text(); +// CDXCanonicalizerMapClass inner = new CDXCanonicalizerMapClass(); + Mapper<LongWritable, Text, Text, Text> inner = getInner(); + // arg 1 is a URL + + String urlString = urlText.toString(); + InputStream is = null; + FileSystem fs = null; + if(urlString.startsWith("http://")) { + URL u = new URL(urlString.toString()); + System.err.println("Openning URL stream for:" + urlString); + is = u.openStream(); + } else { + System.err.println("Creating default Filesystem for:" + urlString); + + fs = FileSystem.get(new Configuration(true)); + Path p = new Path(urlString); +// FSDataInputStream fsdis = fs.open(p); + is = fs.open(p); + + } + if(urlString.endsWith(".gz")) { + is = new GZIPInputStream(is); + } + try { + BufferedReader br = new BufferedReader( + new InputStreamReader(is)); + String tmpS = null; + long line = 0; + while((tmpS = br.readLine()) != null) { + lw.set(line++); + tmp.set(tmpS); + inner.map(lw, tmp, output, reporter); + } + is.close(); + if(fs != null) { + fs.close(); + } + } catch (IOException e) { + System.err.println("IOException with url:" + urlString); + e.printStackTrace(); + throw e; + } + } + + } /** * Mapper which reads an identity CDX line, outputting: key - canonicalized * original URL + timestamp val - everything else @@ -263,23 +349,109 @@ if(i3 > 0) { i4 = s.lastIndexOf(' '); if(i4 > i3) { - ksb.setLength(0); - ksb.append(canonicalizer.urlStringToKey(s.substring(i2 + 1, i3))); - ksb.append(s.substring(i1,i4)); - outKey.set(ksb.toString()); - outValue.set(s.substring(i4+1)); - output.collect(outKey, outValue); - problems = false; + try { + ksb.setLength(0); + ksb.append(canonicalizer.urlStringToKey(s.substring(i2 + 1, i3))); + ksb.append(s.substring(i1,i4)); + outKey.set(ksb.toString()); + outValue.set(s.substring(i4+1)); + output.collect(outKey, outValue); + problems = false; + } catch(URIException e) { + // just eat it.. problems will be true. + } } } } } if(problems) { - System.err.println("Problem with line("+s+")"); + System.err.println("CDX-Can: Problem with line("+s+")"); } } } + /** + * Mapper which reads an identity Funky format CDX line, outputting: + * key - canonicalized original URL + timestamp + * val - everything else + * + * input lines are a hybrid format: + * + * ORIG_URL + * DATE + * '-' (literal) + * MIME + * HTTP_CODE + * SHA1 + * REDIRECT + * START_OFFSET + * ARC_PREFIX (sans .arc.gz) + * ROBOT_FLAG (combo of AIF - no: Archive,Index,Follow, or '-' if none) + * + * Ex: + * http://www.myow.de:80/news_show.php? 20061126032815 - text/html 200 DVKFPTOJGCLT3G5GUVLCETHLFO3222JM - 91098929 foo A + * + * Need to: + * . replace col 3 with orig url + * . replace col 1 with canonicalized orig url + * . replace SHA1 with first 4 digits of SHA1 + * . append .arc.gz to ARC_PREFIX + * . omit lines with ROBOT_FLAG containing 'A' + * . remove last column + * + * @author brad + * @version $Date$, $Revision$ + */ + public static class FunkyCDXCanonicalizerMapClass extends MapReduceBase + implements Mapper<LongWritable, Text, Text, Text> { + + private static int SHA1_DIGITS = 3; + private Text outKey = new Text(); + private Text outValue = new Text(); + AggressiveUrlCanonicalizer canonicalizer = new AggressiveUrlCanonicalizer(); + private StringBuilder ksb = new StringBuilder(); + private StringBuilder vsb = new StringBuilder(); + + private int i1 = 0; + private int i2 = 0; + private int i3 = 0; + private int i4 = 0; + + public void map(LongWritable lineNumber, Text line, + OutputCollector<Text, Text> output, Reporter reporter) + throws IOException { + String s = line.toString(); + + String parts[] = s.split(" "); + boolean problems = true; + if(parts.length == 10) { + if(!parts[9].contains("A")) { + ksb.setLength(0); + vsb.setLength(0); + try { + ksb.append(canonicalizer.urlStringToKey(parts[0])).append(" "); + ksb.append(parts[1]); // date + vsb.append(parts[0]).append(" "); // orig_url + vsb.append(parts[3]).append(" "); // MIME + vsb.append(parts[4]).append(" "); // HTTP_CODE + vsb.append(parts[5].substring(0, SHA1_DIGITS)).append(" "); // SHA1 + vsb.append(parts[6]).append(" "); // redirect + vsb.append(parts[7]).append(" "); // start_offset + vsb.append(parts[8]).append(".arc.gz"); // arc_prefix + outKey.set(ksb.toString()); + outValue.set(vsb.toString()); + output.collect(outKey, outValue); + } catch (URIException e) { + System.err.println("Failed Canonicalize:("+parts[0]+ + ") in ("+parts[8]+"):("+parts[7]+")"); + } + } + } else { + System.err.println("Funky: Problem with line("+s+")"); + } + } + } + public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CDXSort(), args); System.exit(res); Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -0,0 +1,183 @@ +package org.archive.wayback.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * @author brad + * + */ +public class CDXSortDriver implements Tool { + Configuration conf = null; + /** + * As hard-coded into the Text RecordWriter + */ + public static String TEXT_OUTPUT_DELIM_CONFIG = + "mapred.textoutputformat.separator"; + + private static int countLinesInPath(Path path, Configuration conf) + throws IOException { + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream is = fs.open(path); + BufferedReader br = new BufferedReader(new InputStreamReader(is, + "utf-8")); + int lineCount = 0; + while (br.readLine() != null) { + lineCount++; + } + is.close(); + return lineCount; + } + + static int printUsage() { + System.out.println("cdxsort <split> <input> <output>"); + System.out.println("cdxsort [OPTIONS] <split> <input> <output>"); + System.out.println("\tOPTIONS can be:"); + System.out.println("\t\t-m NUM - try to run with approximately NUM map tasks"); + System.out.println("\t\t--compress-output - compress output files with GZip"); + System.out.println("\t\t--delimiter DELIM - assume DELIM delimter for input and output, instead of default <SPACE>"); + System.out.println("\t\t--map-global - use the GLOBAL CDX map function, which implies:"); + System.out.println("\t\t\t. extra trailing field indicating HTML meta NOARCHIVE data, which should be omitted, result lines do not include the last field"); + System.out.println("\t\t\t. truncating digest field to 3 digits"); + System.out.println("\t\t\t. column 0 is original URL (identity CDX files)"); + System.out.println(); +// ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + /** + * The main driver for sort program. Invoke this method to submit the + * map/reduce job. + * + * @throws IOException + * When there is communication problems with the job tracker. + */ + public int run(String[] args) throws Exception { + + String delim = " "; + + long desiredMaps = 10; + boolean compressOutput = false; + List<String> otherArgs = new ArrayList<String>(); + int mapMode = CDXCanonicalizingMapper.MODE_FULL; + for (int i = 0; i < args.length; ++i) { + try { + if ("-m".equals(args[i])) { + desiredMaps = Integer.parseInt(args[++i]); + } else if ("--compress-output".equals(args[i])) { + compressOutput = true; + } else if ("--delimiter".equals(args[i])) { + delim = args[++i]; + } else if ("--map-full".equals(args[i])) { + mapMode = CDXCanonicalizingMapper.MODE_FULL; + } else if ("--map-global".equals(args[i])) { + mapMode = CDXCanonicalizingMapper.MODE_GLOBAL; + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits + } + } + + // Make sure there are exactly 3 parameters left: split input output + if (otherArgs.size() != 3) { + System.out.println("ERROR: Wrong number of parameters: " + + otherArgs.size() + " instead of 3."); + return printUsage(); + } + + + String splitPathString = otherArgs.get(0); + String inputPathString = otherArgs.get(1); + String outputPathString = otherArgs.get(2); + + Path splitPath = new Path(splitPathString); + Path inputPath = new Path(inputPathString); + Path outputPath = new Path(outputPathString); + + Job job = new Job(getConf(), "cdx-sort"); + Configuration conf = job.getConfiguration(); + job.setJarByClass(CDXSortDriver.class); + + job.setMapperClass(CDXCanonicalizingMapper.class); + + job.setReducerClass(CDXReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // configure the "map mode" + CDXCanonicalizingMapper.setMapMode(conf, mapMode); + + // set up the delimter: + conf.set(TEXT_OUTPUT_DELIM_CONFIG, delim); + + if (compressOutput) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); + } + + // set up the Partitioner, including number of reduce tasks: + FileSystem fs = inputPath.getFileSystem(conf); + + int splitCount = countLinesInPath(splitPath, conf); + System.err.println("Split/Reduce count:" + splitCount); + job.setNumReduceTasks(splitCount); + + AlphaPartitioner.setPartitionPath(conf, splitPathString); + job.setPartitionerClass(AlphaPartitioner.class); + + // calculate the byte size to get the correct number of map tasks: + FileStatus inputStatus = fs.getFileStatus(inputPath); + long inputLen = inputStatus.getLen(); + long bytesPerMap = (int) inputLen / desiredMaps; + + FileInputFormat.addInputPath(job, inputPath); + FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); + job.setInputFormatClass(LineDereferencingInputFormat.class); + + FileOutputFormat.setOutputPath(job, outputPath); + + return (job.waitForCompletion(true) ? 0 : 1); + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CDXSortDriver(), args); + System.exit(res); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + +} Property changes on: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java ___________________________________________________________________ Added: svn:keywords + Author Date Revision Id Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingInputFormat.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingInputFormat.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingInputFormat.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -0,0 +1,38 @@ +package org.archive.wayback.hadoop; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +/** + * FileInputFormat subclass which assumes the configured input files are lines + * containing hdfs:// pointers to the actual Text data. + * + * @author brad + * + */ +public class LineDereferencingInputFormat extends FileInputFormat<Text, Text>{ + TextInputFormat tif = null; + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException { + if(tif == null) { + tif = new TextInputFormat(); + } + return tif.getSplits(context); + } + + @Override + public RecordReader<Text, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + return new LineDereferencingRecordReader(); + } +} Property changes on: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingInputFormat.java ___________________________________________________________________ Added: svn:keywords + Author Date Revision Id Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -0,0 +1,105 @@ +package org.archive.wayback.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; + +/** + * RecordReader which reads pointers to actual files from an internal + * LineRecordReader, producing a LineRecordReader for the files pointed to by + * the actual input. + * + * @author brad + * + */ +public class LineDereferencingRecordReader extends RecordReader<Text, Text>{ + LineRecordReader internal = new LineRecordReader(); + + FileSystem fileSystem = null; + Text key = null; + Text value = null; + BufferedReader curReader = null; + String curFile = null; + long curLine = 0; + float progress = 0.0f; + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + fileSystem = fileSplit.getPath().getFileSystem(context.getConfiguration()); + internal.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if(key == null) { + key = new Text(); + } + if(value == null) { + value = new Text(); + } + while(true) { + if(curReader == null) { + // are there more? + if(internal.nextKeyValue()) { + progress = internal.getProgress(); + curFile = internal.getCurrentValue().toString(); + Path path = new Path(curFile); + InputStream is = fileSystem.open(path); + // TODO: use the real Codec stuff.. + if(curFile.endsWith(".gz")) { + is = new GZIPInputStream(is); + } + curReader = new BufferedReader(new InputStreamReader(is)); + + } else { + // all done: + return false; + } + } + // try to read another line: + String nextLine = curReader.readLine(); + if(nextLine != null) { + key.set(curFile+":"+curLine); + value.set(nextLine); + curLine++; + return true; + } + curReader = null; + curFile = null; + curLine = 0; + } + } + + @Override + public Text getCurrentKey() throws IOException, + InterruptedException { + return key; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return progress; + } + + @Override + public void close() throws IOException { + internal.close(); + } +} Property changes on: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java ___________________________________________________________________ Added: svn:keywords + Author Date Revision Id Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2010-09-03 22:30:36 UTC (rev 3244) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2010-09-03 22:31:52 UTC (rev 3245) @@ -11,8 +11,8 @@ int exitCode = -1; ProgramDriver pgd = new ProgramDriver(); try { - pgd.addClass("cdxsort", CDXSort.class, - "A map/reduce program that counts the words in the input files."); + pgd.addClass("cdxsort", CDXSortDriver.class, + "A map/reduce program that canonicalizes and provides a total order sort into multiple CDX files"); pgd.driver(args); // Success exitCode = 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bra...@us...> - 2011-02-17 22:35:54
|
Revision: 3417 http://archive-access.svn.sourceforge.net/archive-access/?rev=3417&view=rev Author: bradtofel Date: 2011-02-17 22:35:47 +0000 (Thu, 17 Feb 2011) Log Message: ----------- Initial checkin of early gzip line dereferencing CDX processing code. Modified Paths: -------------- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-09 22:04:47 UTC (rev 3416) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -100,7 +100,9 @@ private void mapFull(Object y, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); - + if(s.startsWith(" CDX ")) { + return; + } boolean problems = true; i1 = s.indexOf(delim); if(i1 > 0) { Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -69,6 +69,10 @@ System.out.println("cdxsort [OPTIONS] <split> <input> <output>"); System.out.println("\tOPTIONS can be:"); System.out.println("\t\t-m NUM - try to run with approximately NUM map tasks"); + System.out.println("\t\t--compressed-input - assume input is compressed, even without .gz suffix"); + System.out.println("\t\t--gzip-range - assume input lines are PATH START LENGTH such that a"); + System.out.println("\t\t\t valid gzip record exists in PATH between START and START+LENGTH"); + System.out.println("\t\t\t that contains the records to process"); System.out.println("\t\t--compress-output - compress output files with GZip"); System.out.println("\t\t--delimiter DELIM - assume DELIM delimter for input and output, instead of default <SPACE>"); System.out.println("\t\t--map-global - use the GLOBAL CDX map function, which implies:"); @@ -93,6 +97,8 @@ long desiredMaps = 10; boolean compressOutput = false; + boolean compressedInput = false; + boolean gzipRange = false; List<String> otherArgs = new ArrayList<String>(); int mapMode = CDXCanonicalizingMapper.MODE_FULL; for (int i = 0; i < args.length; ++i) { @@ -101,6 +107,10 @@ desiredMaps = Integer.parseInt(args[++i]); } else if ("--compress-output".equals(args[i])) { compressOutput = true; + } else if ("--compressed-input".equals(args[i])) { + compressedInput = true; + } else if ("--gzip-range".equals(args[i])) { + gzipRange = true; } else if ("--delimiter".equals(args[i])) { delim = args[++i]; } else if ("--map-full".equals(args[i])) { @@ -175,8 +185,14 @@ FileInputFormat.addInputPath(job, inputPath); FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); - job.setInputFormatClass(LineDereferencingInputFormat.class); - + if(gzipRange) { + job.setInputFormatClass(GZIPRangeLineDereferencingInputFormat.class); + } else { + job.setInputFormatClass(LineDereferencingInputFormat.class); + if(compressedInput) { + LineDereferencingRecordReader.forceCompressed(conf); + } + } FileOutputFormat.setOutputPath(job, outputPath); return (job.waitForCompletion(true) ? 0 : 1); Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -0,0 +1,17 @@ +package org.archive.wayback.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class GZIPRangeLineDereferencingInputFormat extends LineDereferencingInputFormat { + @Override + public RecordReader<Text, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + return new GZIPRangeLineDereferencingRecordReader(); + } +} Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -0,0 +1,85 @@ +package org.archive.wayback.hadoop; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class GZIPRangeLineDereferencingRecordReader extends LineDereferencingRecordReader{ + String curInputLine = null; + FSDataInputStream fsdis = null; + long curStart = 0; + byte[] buffer = null; + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if(key == null) { + key = new Text(); + } + if(value == null) { + value = new Text(); + } + while(true) { + if(curReader == null) { + // are there more? + if(internal.nextKeyValue()) { + progress = internal.getProgress(); + curInputLine = internal.getCurrentValue().toString(); + String[] parts = curInputLine.split(" "); + if(parts.length != 3) { + throw new IOException("Bad format line(" + curInputLine +")"); + } + String newFile = parts[0]; + if(fsdis != null) { + if(!newFile.equals(curFile)) { + // close old and open new, otherwise we can just + // do another read on the current one: + fsdis.close(); + curFile = newFile; + Path path = new Path(curFile); + fsdis = fileSystem.open(path); + } + } else { + curFile = newFile; + Path path = new Path(curFile); + fsdis = fileSystem.open(path); + } + curFile = parts[0]; + curStart = Long.parseLong(parts[1]); + int length = Integer.parseInt(parts[2]); + if(buffer == null) { + buffer = new byte[length]; + } else if (buffer.length < length) { + buffer = new byte[length]; + } + fsdis.read(curStart,buffer,0,length); + // the whole chunk is now in buffer: + InputStream is = + new GZIPInputStream(new ByteArrayInputStream(buffer,0,length)); + curReader = new BufferedReader(new InputStreamReader(is)); + curLine = 0; + + } else { + // all done: + return false; + } + } + // try to read another line: + String nextLine = curReader.readLine(); + if(nextLine != null) { + key.set(curFile+":"+curStart+":"+curLine); + value.set(nextLine); + curLine++; + return true; + } + curReader.close(); + curReader = null; + } + } + +} Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -0,0 +1,81 @@ +package org.archive.wayback.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MapRunner; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class HTTPImportJob extends Configured implements Tool { + Configuration conf = null; + public final static String HTTP_IMPORT_TARGET = "http-import.target"; + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + + } + + + + public int run(String[] args) throws Exception { + Job job = new Job(getConf(), "http-import"); + Configuration conf = job.getConfiguration(); + job.setJarByClass(HTTPImportJob.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(HTTPImportMapper.class); + + int i = 0; + int numMaps = 10; + while(i < args.length -1) { + if(args[i].equals("-m")) { + i++; + numMaps = Integer.parseInt(args[i]); + i++; + } else { + break; + } + } + if(args.length - 3 != i) { + throw new IllegalArgumentException("wrong number of args..."); + } + Path inputPath = new Path(args[i]); + Path outputPath = new Path(args[i+1]); + Path targetPath = new Path(args[i+2]); + + TextInputFormat.addInputPath(job, inputPath); + FileOutputFormat.setOutputPath(job, outputPath); + conf.set(HTTP_IMPORT_TARGET, targetPath.toString()); + + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + + FileSystem fs = inputPath.getFileSystem(conf); + FileStatus inputStatus = fs.getFileStatus(inputPath); + long inputLen = inputStatus.getLen(); + long bytesPerMap = (int) inputLen / numMaps; + + FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); + + + return (job.waitForCompletion(true) ? 0 : 1); + } + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new HTTPImportJob(), args); + System.exit(res); + } + +} Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -0,0 +1,145 @@ +package org.archive.wayback.hadoop; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.HeadMethod; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; + +public class HTTPImportMapper extends Mapper<LongWritable, Text, Text, Text> { + public final int BUFSIZ = 4096; + Path target = null; + FileSystem filesystem = null; + Text doneText = null; + HttpClient client = null; + public HTTPImportMapper() { + + } + public void init2() { + System.err.println("Init map..."); + } + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + String targetString = conf.get(HTTPImportJob.HTTP_IMPORT_TARGET); + if(targetString == null) { + throw new IOException("No " + HTTPImportJob.HTTP_IMPORT_TARGET + + " specified"); + } + target = new Path(targetString); + filesystem = target.getFileSystem(conf); + doneText = new Text("Done"); + client = new HttpClient(); + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + + String valueS = value.toString(); + String name; + String url = valueS; + int idx = valueS.indexOf(' '); + if(idx == -1) { + URL tmpUrl = new URL(valueS); + name = tmpUrl.getPath(); + if(name.contains("/")) { + name = name.substring(name.lastIndexOf('/')+1); + } + } else { + name = valueS.substring(0,idx); + url = valueS.substring(idx+1); + } + Path thisTarget = new Path(target,name); + doCopy(url, thisTarget); + context.write(value, doneText); + } + + private long getURLLengthByHead(String url) throws IOException { + HeadMethod head = new HeadMethod(url); + long urlLen = -1; + // execute the method and handle any error responses. + try { + int code = client.executeMethod(head); + if(code != 200) { + throw new IOException("Non-200 for HEAD:" + url); + } + urlLen = head.getResponseContentLength(); + // discard: hope it's really empty (HEAD) and thus small... + head.getResponseBody(); + } finally { + head.releaseConnection(); + } + return urlLen; + } + + private long getPathLength(Path path) throws IOException { + FileStatus stat = null; + try { + stat = filesystem.getFileStatus(path); + // present.. check by size: + } catch (FileNotFoundException e) { + return -1; + } + return stat.getLen(); + } + + + private void doCopy(String url, Path target) throws IOException { + // Check if the target exists (from previous map) + long targetLen = getPathLength(target); + long urlLen = -1; + if(targetLen > -1) { + // there's a file in the filesystem already, see if it's the + // same length: + urlLen = getURLLengthByHead(url); + if(urlLen == targetLen) { + // same size, assume it's done: + return; + } + // diff length, do copy again, first remove old: + if(!filesystem.delete(target, false)) { + throw new IOException("Failed to delete old copy"); + } + } + // do the copy: + FSDataOutputStream out = filesystem.create(target, false); + GetMethod get = new GetMethod(url); + long copied = 0; + try { + int code = client.executeMethod(get); + if(code != 200) { + throw new IOException("Non 200 on GET: " + url); + } + urlLen = get.getResponseContentLength(); + InputStream in = get.getResponseBodyAsStream(); + byte buffer[] = new byte[BUFSIZ]; + for(int cbread; (cbread = in.read(buffer)) >= 0; ) { + out.write(buffer, 0, cbread); + copied += cbread; + } + } finally { + get.releaseConnection(); + out.close(); + } + if(copied != urlLen) { + // ack.. what went wrong? + throw new IOException("Wrong copy length want(" + urlLen + + ") got(" + copied + ") URL:" + url); + } + } +} Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-09 22:04:47 UTC (rev 3416) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -45,6 +46,9 @@ public class LineDereferencingRecordReader extends RecordReader<Text, Text>{ LineRecordReader internal = new LineRecordReader(); + + protected static final String FORCE_COMPRESSED_FLAG = "line-reref.force-compressed"; + FileSystem fileSystem = null; Text key = null; Text value = null; @@ -52,11 +56,18 @@ String curFile = null; long curLine = 0; float progress = 0.0f; + boolean forceCompressed = false; + public static void forceCompressed(Configuration conf) { + conf.setBoolean(FORCE_COMPRESSED_FLAG, true); + } + @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + forceCompressed = conf.getBoolean(FORCE_COMPRESSED_FLAG, false); FileSplit fileSplit = (FileSplit) split; - fileSystem = fileSplit.getPath().getFileSystem(context.getConfiguration()); + fileSystem = fileSplit.getPath().getFileSystem(conf); internal.initialize(split, context); } @@ -77,8 +88,9 @@ Path path = new Path(curFile); InputStream is = fileSystem.open(path); // TODO: use the real Codec stuff.. - if(curFile.endsWith(".gz")) { - is = new GZIPInputStream(is); + if(forceCompressed || curFile.endsWith(".gz")) { +// is = new GZIPInputStream(is); + is = new MultiMemberGZIPInputStream(is); } curReader = new BufferedReader(new InputStreamReader(is)); Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -0,0 +1,96 @@ +package org.archive.wayback.hadoop; + +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +public class MultiMemberGZIPInputStream extends GZIPInputStream { + + public MultiMemberGZIPInputStream(InputStream in, int size) throws IOException + { + // Wrap the stream in a PushbackInputStream... + super(new PushbackInputStream(in, size), size); + this.size=size; + } + + public MultiMemberGZIPInputStream(InputStream in) throws IOException + { + // Wrap the stream in a PushbackInputStream... + super(new PushbackInputStream(in, 1024)); + this.size=-1; + } + + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent) throws IOException + { + super(parent.in); + this.size=-1; + this.parent=parent.parent==null ? parent : parent.parent; + this.parent.child=this; + } + + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent, int size) throws IOException + { + super(parent.in, size); + this.size=size; + this.parent=parent.parent==null ? parent : parent.parent; + this.parent.child=this; + } + + private MultiMemberGZIPInputStream parent; + private MultiMemberGZIPInputStream child; + private int size; + private boolean eos; + + public int read(byte[] inputBuffer, int inputBufferOffset, int inputBufferLen) throws IOException { + + if (eos) { return -1;} + if (this.child!=null) + return this.child.read(inputBuffer, inputBufferOffset, inputBufferLen); + + int charsRead=super.read(inputBuffer, inputBufferOffset, inputBufferLen); + if (charsRead==-1) + { + // Push any remaining buffered data back onto the stream + // If the stream is then not empty, use it to construct + // a new instance of this class and delegate this and any + // future calls to it... + int n = inf.getRemaining() - 8; + if (n > 0) + { + // More than 8 bytes remaining in deflater + // First 8 are gzip trailer. Add the rest to + // any un-read data... + ((PushbackInputStream)this.in).unread(buf, len-n, n); + } + else + { + // Nothing in the buffer. We need to know whether or not + // there is unread data available in the underlying stream + // since the base class will not handle an empty file. + // Read a byte to see if there is data and if so, + // push it back onto the stream... + byte[] b=new byte[1]; + int ret=in.read(b,0,1); + if (ret==-1) + { + eos=true; + return -1; + } + else + ((PushbackInputStream)this.in).unread(b, 0, 1); + } + + MultiMemberGZIPInputStream child; + if (this.size==-1) + child=new MultiMemberGZIPInputStream(this); + else + child=new MultiMemberGZIPInputStream(this, this.size); + return child.read(inputBuffer, inputBufferOffset, inputBufferLen); + } + else + return charsRead; + } + +} + Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) @@ -32,6 +32,8 @@ try { pgd.addClass("cdxsort", CDXSortDriver.class, "A map/reduce program that canonicalizes and provides a total order sort into multiple CDX files"); + pgd.addClass("http-import", HTTPImportJob.class, + "A map/reduce program that imports a bunch of URLs into an HDFS directory"); pgd.driver(args); // Success exitCode = 0; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: Gordon M. <go...@ar...> - 2011-02-18 01:27:45
|
Re: MultiMemberGZIPInputStream As of a couple days ago, Heritrix TRUNK has a GZIPMembersInputStream, that offers not just reads across member-boundaries, but... • behaves the same pre and post JDK6u23! • gives calling-code option of stop-on-member-end or read-straight-thru! • reports member start,end offsets in compressed stream! • offers skip/seek in compressed-offsets! ...and that's not all! It's also got: • comments! • tests! • general compliance with our coding standards (like braces around 1-line if/then/else clauses)! Check it out. - Gordon On 2/17/11 2:35 PM, bra...@us... wrote: > Revision: 3417 > http://archive-access.svn.sourceforge.net/archive-access/?rev=3417&view=rev > Author: bradtofel > Date: 2011-02-17 22:35:47 +0000 (Thu, 17 Feb 2011) > > Log Message: > ----------- > Initial checkin of early gzip line dereferencing CDX processing code. > > Modified Paths: > -------------- > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java > > Added Paths: > ----------- > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java > trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXCanonicalizingMapper.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -100,7 +100,9 @@ > private void mapFull(Object y, Text value, Context context) > throws IOException, InterruptedException { > String s = value.toString(); > - > + if(s.startsWith(" CDX ")) { > + return; > + } > boolean problems = true; > i1 = s.indexOf(delim); > if(i1> 0) { > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -69,6 +69,10 @@ > System.out.println("cdxsort [OPTIONS]<split> <input> <output>"); > System.out.println("\tOPTIONS can be:"); > System.out.println("\t\t-m NUM - try to run with approximately NUM map tasks"); > + System.out.println("\t\t--compressed-input - assume input is compressed, even without .gz suffix"); > + System.out.println("\t\t--gzip-range - assume input lines are PATH START LENGTH such that a"); > + System.out.println("\t\t\t valid gzip record exists in PATH between START and START+LENGTH"); > + System.out.println("\t\t\t that contains the records to process"); > System.out.println("\t\t--compress-output - compress output files with GZip"); > System.out.println("\t\t--delimiter DELIM - assume DELIM delimter for input and output, instead of default<SPACE>"); > System.out.println("\t\t--map-global - use the GLOBAL CDX map function, which implies:"); > @@ -93,6 +97,8 @@ > > long desiredMaps = 10; > boolean compressOutput = false; > + boolean compressedInput = false; > + boolean gzipRange = false; > List<String> otherArgs = new ArrayList<String>(); > int mapMode = CDXCanonicalizingMapper.MODE_FULL; > for (int i = 0; i< args.length; ++i) { > @@ -101,6 +107,10 @@ > desiredMaps = Integer.parseInt(args[++i]); > } else if ("--compress-output".equals(args[i])) { > compressOutput = true; > + } else if ("--compressed-input".equals(args[i])) { > + compressedInput = true; > + } else if ("--gzip-range".equals(args[i])) { > + gzipRange = true; > } else if ("--delimiter".equals(args[i])) { > delim = args[++i]; > } else if ("--map-full".equals(args[i])) { > @@ -175,8 +185,14 @@ > > FileInputFormat.addInputPath(job, inputPath); > FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); > - job.setInputFormatClass(LineDereferencingInputFormat.class); > - > + if(gzipRange) { > + job.setInputFormatClass(GZIPRangeLineDereferencingInputFormat.class); > + } else { > + job.setInputFormatClass(LineDereferencingInputFormat.class); > + if(compressedInput) { > + LineDereferencingRecordReader.forceCompressed(conf); > + } > + } > FileOutputFormat.setOutputPath(job, outputPath); > > return (job.waitForCompletion(true) ? 0 : 1); > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingInputFormat.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,17 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.IOException; > + > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapreduce.InputSplit; > +import org.apache.hadoop.mapreduce.RecordReader; > +import org.apache.hadoop.mapreduce.TaskAttemptContext; > + > +public class GZIPRangeLineDereferencingInputFormat extends LineDereferencingInputFormat { > + @Override > + public RecordReader<Text, Text> createRecordReader(InputSplit split, > + TaskAttemptContext context) throws IOException, > + InterruptedException { > + return new GZIPRangeLineDereferencingRecordReader(); > + } > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,85 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.BufferedReader; > +import java.io.ByteArrayInputStream; > +import java.io.IOException; > +import java.io.InputStream; > +import java.io.InputStreamReader; > +import java.util.zip.GZIPInputStream; > + > +import org.apache.hadoop.fs.FSDataInputStream; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.Text; > + > +public class GZIPRangeLineDereferencingRecordReader extends LineDereferencingRecordReader{ > + String curInputLine = null; > + FSDataInputStream fsdis = null; > + long curStart = 0; > + byte[] buffer = null; > + @Override > + public boolean nextKeyValue() throws IOException, InterruptedException { > + if(key == null) { > + key = new Text(); > + } > + if(value == null) { > + value = new Text(); > + } > + while(true) { > + if(curReader == null) { > + // are there more? > + if(internal.nextKeyValue()) { > + progress = internal.getProgress(); > + curInputLine = internal.getCurrentValue().toString(); > + String[] parts = curInputLine.split(" "); > + if(parts.length != 3) { > + throw new IOException("Bad format line(" + curInputLine +")"); > + } > + String newFile = parts[0]; > + if(fsdis != null) { > + if(!newFile.equals(curFile)) { > + // close old and open new, otherwise we can just > + // do another read on the current one: > + fsdis.close(); > + curFile = newFile; > + Path path = new Path(curFile); > + fsdis = fileSystem.open(path); > + } > + } else { > + curFile = newFile; > + Path path = new Path(curFile); > + fsdis = fileSystem.open(path); > + } > + curFile = parts[0]; > + curStart = Long.parseLong(parts[1]); > + int length = Integer.parseInt(parts[2]); > + if(buffer == null) { > + buffer = new byte[length]; > + } else if (buffer.length< length) { > + buffer = new byte[length]; > + } > + fsdis.read(curStart,buffer,0,length); > + // the whole chunk is now in buffer: > + InputStream is = > + new GZIPInputStream(new ByteArrayInputStream(buffer,0,length)); > + curReader = new BufferedReader(new InputStreamReader(is)); > + curLine = 0; > + > + } else { > + // all done: > + return false; > + } > + } > + // try to read another line: > + String nextLine = curReader.readLine(); > + if(nextLine != null) { > + key.set(curFile+":"+curStart+":"+curLine); > + value.set(nextLine); > + curLine++; > + return true; > + } > + curReader.close(); > + curReader = null; > + } > + } > + > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportJob.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,81 @@ > +package org.archive.wayback.hadoop; > + > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.conf.Configured; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapred.MapRunner; > +import org.apache.hadoop.mapreduce.Job; > +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > +import org.apache.hadoop.util.Tool; > +import org.apache.hadoop.util.ToolRunner; > + > +public class HTTPImportJob extends Configured implements Tool { > + Configuration conf = null; > + public final static String HTTP_IMPORT_TARGET = "http-import.target"; > + public Configuration getConf() { > + return conf; > + } > + > + public void setConf(Configuration conf) { > + this.conf = conf; > + > + } > + > + > + > + public int run(String[] args) throws Exception { > + Job job = new Job(getConf(), "http-import"); > + Configuration conf = job.getConfiguration(); > + job.setJarByClass(HTTPImportJob.class); > + job.setInputFormatClass(TextInputFormat.class); > + job.setOutputFormatClass(TextOutputFormat.class); > + job.setOutputKeyClass(Text.class); > + job.setOutputValueClass(Text.class); > + job.setMapperClass(HTTPImportMapper.class); > + > + int i = 0; > + int numMaps = 10; > + while(i< args.length -1) { > + if(args[i].equals("-m")) { > + i++; > + numMaps = Integer.parseInt(args[i]); > + i++; > + } else { > + break; > + } > + } > + if(args.length - 3 != i) { > + throw new IllegalArgumentException("wrong number of args..."); > + } > + Path inputPath = new Path(args[i]); > + Path outputPath = new Path(args[i+1]); > + Path targetPath = new Path(args[i+2]); > + > + TextInputFormat.addInputPath(job, inputPath); > + FileOutputFormat.setOutputPath(job, outputPath); > + conf.set(HTTP_IMPORT_TARGET, targetPath.toString()); > + > + conf.setBoolean("mapred.map.tasks.speculative.execution", false); > + > + FileSystem fs = inputPath.getFileSystem(conf); > + FileStatus inputStatus = fs.getFileStatus(inputPath); > + long inputLen = inputStatus.getLen(); > + long bytesPerMap = (int) inputLen / numMaps; > + > + FileInputFormat.setMaxInputSplitSize(job, bytesPerMap); > + > + > + return (job.waitForCompletion(true) ? 0 : 1); > + } > + public static void main(String[] args) throws Exception { > + int res = ToolRunner.run(new Configuration(), new HTTPImportJob(), args); > + System.exit(res); > + } > + > +} > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/HTTPImportMapper.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,145 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.FileNotFoundException; > +import java.io.IOException; > +import java.io.InputStream; > +import java.net.URL; > + > +import org.apache.commons.httpclient.HttpClient; > +import org.apache.commons.httpclient.methods.GetMethod; > +import org.apache.commons.httpclient.methods.HeadMethod; > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.fs.FSDataOutputStream; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.LongWritable; > +import org.apache.hadoop.io.Text; > +import org.apache.hadoop.mapreduce.Mapper; > +import org.apache.hadoop.mapreduce.Mapper.Context; > + > +public class HTTPImportMapper extends Mapper<LongWritable, Text, Text, Text> { > + public final int BUFSIZ = 4096; > + Path target = null; > + FileSystem filesystem = null; > + Text doneText = null; > + HttpClient client = null; > + public HTTPImportMapper() { > + > + } > + public void init2() { > + System.err.println("Init map..."); > + } > + @Override > + protected void setup(Context context) throws IOException, > + InterruptedException { > + super.setup(context); > + Configuration conf = context.getConfiguration(); > + String targetString = conf.get(HTTPImportJob.HTTP_IMPORT_TARGET); > + if(targetString == null) { > + throw new IOException("No " + HTTPImportJob.HTTP_IMPORT_TARGET > + + " specified"); > + } > + target = new Path(targetString); > + filesystem = target.getFileSystem(conf); > + doneText = new Text("Done"); > + client = new HttpClient(); > + } > + > + @Override > + protected void map(LongWritable key, Text value, Context context) > + throws IOException, InterruptedException { > + > + String valueS = value.toString(); > + String name; > + String url = valueS; > + int idx = valueS.indexOf(' '); > + if(idx == -1) { > + URL tmpUrl = new URL(valueS); > + name = tmpUrl.getPath(); > + if(name.contains("/")) { > + name = name.substring(name.lastIndexOf('/')+1); > + } > + } else { > + name = valueS.substring(0,idx); > + url = valueS.substring(idx+1); > + } > + Path thisTarget = new Path(target,name); > + doCopy(url, thisTarget); > + context.write(value, doneText); > + } > + > + private long getURLLengthByHead(String url) throws IOException { > + HeadMethod head = new HeadMethod(url); > + long urlLen = -1; > + // execute the method and handle any error responses. > + try { > + int code = client.executeMethod(head); > + if(code != 200) { > + throw new IOException("Non-200 for HEAD:" + url); > + } > + urlLen = head.getResponseContentLength(); > + // discard: hope it's really empty (HEAD) and thus small... > + head.getResponseBody(); > + } finally { > + head.releaseConnection(); > + } > + return urlLen; > + } > + > + private long getPathLength(Path path) throws IOException { > + FileStatus stat = null; > + try { > + stat = filesystem.getFileStatus(path); > + // present.. check by size: > + } catch (FileNotFoundException e) { > + return -1; > + } > + return stat.getLen(); > + } > + > + > + private void doCopy(String url, Path target) throws IOException { > + // Check if the target exists (from previous map) > + long targetLen = getPathLength(target); > + long urlLen = -1; > + if(targetLen> -1) { > + // there's a file in the filesystem already, see if it's the > + // same length: > + urlLen = getURLLengthByHead(url); > + if(urlLen == targetLen) { > + // same size, assume it's done: > + return; > + } > + // diff length, do copy again, first remove old: > + if(!filesystem.delete(target, false)) { > + throw new IOException("Failed to delete old copy"); > + } > + } > + // do the copy: > + FSDataOutputStream out = filesystem.create(target, false); > + GetMethod get = new GetMethod(url); > + long copied = 0; > + try { > + int code = client.executeMethod(get); > + if(code != 200) { > + throw new IOException("Non 200 on GET: " + url); > + } > + urlLen = get.getResponseContentLength(); > + InputStream in = get.getResponseBodyAsStream(); > + byte buffer[] = new byte[BUFSIZ]; > + for(int cbread; (cbread = in.read(buffer))>= 0; ) { > + out.write(buffer, 0, cbread); > + copied += cbread; > + } > + } finally { > + get.releaseConnection(); > + out.close(); > + } > + if(copied != urlLen) { > + // ack.. what went wrong? > + throw new IOException("Wrong copy length want(" + urlLen > + + ") got(" + copied + ") URL:" + url); > + } > + } > +} > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -25,6 +25,7 @@ > import java.io.InputStreamReader; > import java.util.zip.GZIPInputStream; > > +import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.Text; > @@ -45,6 +46,9 @@ > public class LineDereferencingRecordReader extends RecordReader<Text, Text>{ > LineRecordReader internal = new LineRecordReader(); > > + > + protected static final String FORCE_COMPRESSED_FLAG = "line-reref.force-compressed"; > + > FileSystem fileSystem = null; > Text key = null; > Text value = null; > @@ -52,11 +56,18 @@ > String curFile = null; > long curLine = 0; > float progress = 0.0f; > + boolean forceCompressed = false; > + public static void forceCompressed(Configuration conf) { > + conf.setBoolean(FORCE_COMPRESSED_FLAG, true); > + } > + > @Override > public void initialize(InputSplit split, TaskAttemptContext context) > throws IOException, InterruptedException { > + Configuration conf = context.getConfiguration(); > + forceCompressed = conf.getBoolean(FORCE_COMPRESSED_FLAG, false); > FileSplit fileSplit = (FileSplit) split; > - fileSystem = fileSplit.getPath().getFileSystem(context.getConfiguration()); > + fileSystem = fileSplit.getPath().getFileSystem(conf); > internal.initialize(split, context); > } > > @@ -77,8 +88,9 @@ > Path path = new Path(curFile); > InputStream is = fileSystem.open(path); > // TODO: use the real Codec stuff.. > - if(curFile.endsWith(".gz")) { > - is = new GZIPInputStream(is); > + if(forceCompressed || curFile.endsWith(".gz")) { > +// is = new GZIPInputStream(is); > + is = new MultiMemberGZIPInputStream(is); > } > curReader = new BufferedReader(new InputStreamReader(is)); > > > Added: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java (rev 0) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/MultiMemberGZIPInputStream.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -0,0 +1,96 @@ > +package org.archive.wayback.hadoop; > + > +import java.io.InputStream; > +import java.io.PushbackInputStream; > +import java.io.IOException; > +import java.util.zip.GZIPInputStream; > + > +public class MultiMemberGZIPInputStream extends GZIPInputStream { > + > + public MultiMemberGZIPInputStream(InputStream in, int size) throws IOException > + { > + // Wrap the stream in a PushbackInputStream... > + super(new PushbackInputStream(in, size), size); > + this.size=size; > + } > + > + public MultiMemberGZIPInputStream(InputStream in) throws IOException > + { > + // Wrap the stream in a PushbackInputStream... > + super(new PushbackInputStream(in, 1024)); > + this.size=-1; > + } > + > + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent) throws IOException > + { > + super(parent.in); > + this.size=-1; > + this.parent=parent.parent==null ? parent : parent.parent; > + this.parent.child=this; > + } > + > + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent, int size) throws IOException > + { > + super(parent.in, size); > + this.size=size; > + this.parent=parent.parent==null ? parent : parent.parent; > + this.parent.child=this; > + } > + > + private MultiMemberGZIPInputStream parent; > + private MultiMemberGZIPInputStream child; > + private int size; > + private boolean eos; > + > + public int read(byte[] inputBuffer, int inputBufferOffset, int inputBufferLen) throws IOException { > + > + if (eos) { return -1;} > + if (this.child!=null) > + return this.child.read(inputBuffer, inputBufferOffset, inputBufferLen); > + > + int charsRead=super.read(inputBuffer, inputBufferOffset, inputBufferLen); > + if (charsRead==-1) > + { > + // Push any remaining buffered data back onto the stream > + // If the stream is then not empty, use it to construct > + // a new instance of this class and delegate this and any > + // future calls to it... > + int n = inf.getRemaining() - 8; > + if (n> 0) > + { > + // More than 8 bytes remaining in deflater > + // First 8 are gzip trailer. Add the rest to > + // any un-read data... > + ((PushbackInputStream)this.in).unread(buf, len-n, n); > + } > + else > + { > + // Nothing in the buffer. We need to know whether or not > + // there is unread data available in the underlying stream > + // since the base class will not handle an empty file. > + // Read a byte to see if there is data and if so, > + // push it back onto the stream... > + byte[] b=new byte[1]; > + int ret=in.read(b,0,1); > + if (ret==-1) > + { > + eos=true; > + return -1; > + } > + else > + ((PushbackInputStream)this.in).unread(b, 0, 1); > + } > + > + MultiMemberGZIPInputStream child; > + if (this.size==-1) > + child=new MultiMemberGZIPInputStream(this); > + else > + child=new MultiMemberGZIPInputStream(this, this.size); > + return child.read(inputBuffer, inputBufferOffset, inputBufferLen); > + } > + else > + return charsRead; > + } > + > +} > + > > Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java > =================================================================== > --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-09 22:04:47 UTC (rev 3416) > +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/SortDriver.java 2011-02-17 22:35:47 UTC (rev 3417) > @@ -32,6 +32,8 @@ > try { > pgd.addClass("cdxsort", CDXSortDriver.class, > "A map/reduce program that canonicalizes and provides a total order sort into multiple CDX files"); > + pgd.addClass("http-import", HTTPImportJob.class, > + "A map/reduce program that imports a bunch of URLs into an HDFS directory"); > pgd.driver(args); > // Success > exitCode = 0; > > > This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. > > ------------------------------------------------------------------------------ > The ultimate all-in-one performance toolkit: Intel(R) Parallel Studio XE: > Pinpoint memory and threading errors before they happen. > Find and fix more than 250 security defects in the development cycle. > Locate bottlenecks in serial and parallel code that limit performance. > http://p.sf.net/sfu/intel-dev2devfeb > _______________________________________________ > Archive-access-cvs mailing list > Arc...@li... > https://lists.sourceforge.net/lists/listinfo/archive-access-cvs |
From: Aaron B. <aa...@ar...> - 2011-02-18 06:33:15
|
An odd mailing list to reply to.....yet there was much rejoicing. Aaron |
From: <bra...@us...> - 2011-05-25 01:51:40
|
Revision: 3459 http://archive-access.svn.sourceforge.net/archive-access/?rev=3459&view=rev Author: bradtofel Date: 2011-05-25 01:51:34 +0000 (Wed, 25 May 2011) Log Message: ----------- OPTIMIZ: now use static reference to ByteOp.UTF8 Charset object. Previously, it was either being "assumed" as default, as in, not specified, or referenced by name, causing a lookup of the Charset object, which was causing lock contention Modified Paths: -------------- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java 2011-05-25 01:50:17 UTC (rev 3458) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSort.java 2011-05-25 01:51:34 UTC (rev 3459) @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; @@ -58,6 +59,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.archive.wayback.util.ByteOp; import org.archive.wayback.util.url.AggressiveUrlCanonicalizer; public class CDXSort extends Configured implements Tool { @@ -134,8 +136,9 @@ // load the split file, find and set the number of reduces AlphaPartitioner partitioner = new AlphaPartitioner(); File localSplitFile = new File(splitPath); - FileReader is = new FileReader(localSplitFile); - BufferedReader bis = new BufferedReader(is); + FileInputStream fis = new FileInputStream(localSplitFile); + InputStreamReader isr = new InputStreamReader(fis,ByteOp.UTF8); + BufferedReader bis = new BufferedReader(isr); // try { // partitioner.loadBoundaries(bis); // } catch (IOException except) { @@ -314,7 +317,7 @@ } try { BufferedReader br = new BufferedReader( - new InputStreamReader(is)); + new InputStreamReader(is,ByteOp.UTF8)); String tmpS = null; long line = 0; while((tmpS = br.readLine()) != null) { Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-05-25 01:50:17 UTC (rev 3458) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/CDXSortDriver.java 2011-05-25 01:51:34 UTC (rev 3459) @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.archive.wayback.util.ByteOp; /** * @author brad @@ -54,8 +55,7 @@ throws IOException { FileSystem fs = path.getFileSystem(conf); FSDataInputStream is = fs.open(path); - BufferedReader br = new BufferedReader(new InputStreamReader(is, - "utf-8")); + BufferedReader br = new BufferedReader(new InputStreamReader(is, ByteOp.UTF8)); int lineCount = 0; while (br.readLine() != null) { lineCount++; Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java 2011-05-25 01:50:17 UTC (rev 3458) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/GZIPRangeLineDereferencingRecordReader.java 2011-05-25 01:51:34 UTC (rev 3459) @@ -10,6 +10,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.archive.wayback.util.ByteOp; public class GZIPRangeLineDereferencingRecordReader extends LineDereferencingRecordReader{ String curInputLine = null; @@ -61,7 +62,7 @@ // the whole chunk is now in buffer: InputStream is = new GZIPInputStream(new ByteArrayInputStream(buffer,0,length)); - curReader = new BufferedReader(new InputStreamReader(is)); + curReader = new BufferedReader(new InputStreamReader(is,ByteOp.UTF8)); curLine = 0; } else { Modified: trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-05-25 01:50:17 UTC (rev 3458) +++ trunk/archive-access/projects/wayback/wayback-hadoop-java/src/main/java/org/archive/wayback/hadoop/LineDereferencingRecordReader.java 2011-05-25 01:51:34 UTC (rev 3459) @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.archive.wayback.util.ByteOp; /** * RecordReader which reads pointers to actual files from an internal @@ -92,7 +93,7 @@ // is = new GZIPInputStream(is); is = new MultiMemberGZIPInputStream(is); } - curReader = new BufferedReader(new InputStreamReader(is)); + curReader = new BufferedReader(new InputStreamReader(is,ByteOp.UTF8)); } else { // all done: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |