From: <bi...@us...> - 2008-07-24 23:34:37
|
Revision: 2491 http://archive-access.svn.sourceforge.net/archive-access/?rev=2491&view=rev Author: binzino Date: 2008-07-24 23:34:46 +0000 (Thu, 24 Jul 2008) Log Message: ----------- Add content-length to metadata stored for imported document. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Importer.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/NutchWax.java Modified: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Importer.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Importer.java 2008-07-24 23:31:54 UTC (rev 2490) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Importer.java 2008-07-24 23:34:46 UTC (rev 2491) @@ -231,7 +231,7 @@ { ARCRecordMetaData meta = record.getMetaData(); - if ( LOG.isInfoEnabled() ) LOG.info( "Consider URL: " + meta.getUrl() + " (" + meta.getMimetype() + ")" ); + if ( LOG.isInfoEnabled() ) LOG.info( "Consider URL: " + meta.getUrl() + " (" + meta.getMimetype() + ") [" + meta.getLength( ) + "]" ); try { @@ -302,16 +302,18 @@ // We store both the normal URL and the URL+digest key for // later retrieval by the indexing plugin(s). - contentMetadata.set( NutchWax.URL_KEY, url ); - contentMetadata.set( NutchWax.ORIG_KEY, key ); + contentMetadata.set( NutchWax.URL_KEY, url ); + contentMetadata.set( NutchWax.ORIG_KEY, key ); - contentMetadata.set( NutchWax.CONTENT_TYPE_KEY, meta.getMimetype() ); - contentMetadata.set( NutchWax.FILENAME_KEY, meta.getArcFile().getName() ); - contentMetadata.set( NutchWax.FILEOFFSET_KEY, String.valueOf( record.getHeader().getOffset( ) ) ); - contentMetadata.set( NutchWax.COLLECTION_KEY, collectionName ); - contentMetadata.set( NutchWax.DATE_KEY, meta.getDate() ); - contentMetadata.set( NutchWax.DIGEST_KEY, meta.getDigest() ); + contentMetadata.set( NutchWax.FILENAME_KEY, meta.getArcFile().getName() ); + contentMetadata.set( NutchWax.FILEOFFSET_KEY, String.valueOf( record.getHeader().getOffset( ) ) ); + contentMetadata.set( NutchWax.COLLECTION_KEY, collectionName ); + contentMetadata.set( NutchWax.DATE_KEY, meta.getDate() ); + contentMetadata.set( NutchWax.DIGEST_KEY, meta.getDigest() ); + contentMetadata.set( NutchWax.CONTENT_TYPE_KEY, meta.getMimetype() ); + contentMetadata.set( NutchWax.CONTENT_LENGTH_KEY, String.valueOf( meta.getLength() ) ); + Content content = new Content( url, url, bytes, meta.getMimetype(), contentMetadata, getConf() ); output( output, new Text( key ), content ); Modified: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/NutchWax.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/NutchWax.java 2008-07-24 23:31:54 UTC (rev 2490) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/NutchWax.java 2008-07-24 23:34:46 UTC (rev 2491) @@ -22,12 +22,13 @@ public class NutchWax { - public static final String URL_KEY = "url"; - public static final String ORIG_KEY = "orig"; - public static final String FILENAME_KEY = "filename"; - public static final String FILEOFFSET_KEY = "fileoffset"; - public static final String COLLECTION_KEY = "collection"; - public static final String CONTENT_TYPE_KEY = "type"; - public static final String DATE_KEY = "date"; - public static final String DIGEST_KEY = "digest"; + public static final String URL_KEY = "url"; + public static final String ORIG_KEY = "orig"; + public static final String FILENAME_KEY = "filename"; + public static final String FILEOFFSET_KEY = "fileoffset"; + public static final String COLLECTION_KEY = "collection"; + public static final String DATE_KEY = "date"; + public static final String DIGEST_KEY = "digest"; + public static final String CONTENT_TYPE_KEY = "type"; + public static final String CONTENT_LENGTH_KEY = "length"; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bi...@us...> - 2009-02-23 03:54:54
|
Revision: 2683 http://archive-access.svn.sourceforge.net/archive-access/?rev=2683&view=rev Author: binzino Date: 2009-02-23 03:54:47 +0000 (Mon, 23 Feb 2009) Log Message: ----------- Added PageRank* classes to mirror the Nutch LinkDb classes but only /count/ the inlinks, not preserve them. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/tools/PageRanker.java Added Paths: ----------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDb.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbFilter.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbMerger.java Added: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDb.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDb.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDb.java 2009-02-23 03:54:47 UTC (rev 2683) @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.nutchwax; + +import java.io.*; +import java.util.*; +import java.net.*; + +// Commons Logging imports +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; + +import org.apache.nutch.crawl.LinkDbFilter; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.*; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** + * <p>Maintains an inverted link map, listing incoming links for each + * url.</p> + * <p>Aaron Binns @ archive.org: see comments in PageRankDbMerger.</p> +*/ +public class PageRankDb extends Configured + implements Tool, Mapper<Text, ParseData, Text, IntWritable> +{ + public static final Log LOG = LogFactory.getLog(PageRankDb.class); + + public static final String CURRENT_NAME = "current"; + public static final String LOCK_NAME = ".locked"; + + private int maxAnchorLength; + private boolean ignoreInternalLinks; + private URLFilters urlFilters; + private URLNormalizers urlNormalizers; + + public PageRankDb( ) + { + } + + public PageRankDb( Configuration conf ) + { + setConf(conf); + } + + public void configure( JobConf job ) + { + ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true); + if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) + { + urlFilters = new URLFilters(job); + } + if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) + { + urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB); + } + } + + public void close( ) + { + } + + public void map( Text key, ParseData parseData, OutputCollector<Text, IntWritable> output, Reporter reporter ) + throws IOException + { + String fromUrl = key.toString(); + String fromHost = getHost(fromUrl); + + if (urlNormalizers != null) + { + try + { + fromUrl = urlNormalizers.normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url + } + catch (Exception e) + { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl != null && urlFilters != null) + { + try + { + fromUrl = urlFilters.filter(fromUrl); // filter the url + } + catch (Exception e) + { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl == null) return; + + Outlink[] outlinks = parseData.getOutlinks(); + + for (int i = 0; i < outlinks.length; i++) + { + Outlink outlink = outlinks[i]; + String toUrl = outlink.getToUrl(); + + if (ignoreInternalLinks) + { + String toHost = getHost(toUrl); + if (toHost == null || toHost.equals(fromHost)) + { // internal link + continue; // skip it + } + } + if (urlNormalizers != null) + { + try + { + toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url + } + catch (Exception e) + { + LOG.warn("Skipping " + toUrl + ":" + e); + toUrl = null; + } + } + if (toUrl != null && urlFilters != null) + { + try + { + toUrl = urlFilters.filter(toUrl); // filter the url + } + catch (Exception e) + { + LOG.warn("Skipping " + toUrl + ":" + e); + toUrl = null; + } + } + + if (toUrl == null) continue; + + // DIFF: We just emit a count of '1' for the toUrl. That's it. + // Rather than the list of inlinks as in LinkDb. + output.collect( new Text(toUrl), new IntWritable( 1 ) ); + } + } + + private String getHost(String url) + { + try + { + return new URL(url).getHost().toLowerCase(); + } + catch (MalformedURLException e) + { + return null; + } + } + + public void invert(Path pageRankDb, final Path segmentsDir, boolean normalize, boolean filter, boolean force) throws IOException + { + final FileSystem fs = FileSystem.get(getConf()); + FileStatus[] files = fs.listStatus(segmentsDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + invert(pageRankDb, HadoopFSUtil.getPaths(files), normalize, filter, force); + } + + public void invert(Path pageRankDb, Path[] segments, boolean normalize, boolean filter, boolean force) throws IOException + { + + Path lock = new Path(pageRankDb, LOCK_NAME); + FileSystem fs = FileSystem.get(getConf()); + LockUtil.createLockFile(fs, lock, force); + Path currentPageRankDb = new Path(pageRankDb, CURRENT_NAME); + if (LOG.isInfoEnabled()) + { + LOG.info("PageRankDb: starting"); + LOG.info("PageRankDb: pageRankDb: " + pageRankDb); + LOG.info("PageRankDb: URL normalize: " + normalize); + LOG.info("PageRankDb: URL filter: " + filter); + } + JobConf job = PageRankDb.createJob(getConf(), pageRankDb, normalize, filter); + for (int i = 0; i < segments.length; i++) + { + if (LOG.isInfoEnabled()) + { + LOG.info("PageRankDb: adding segment: " + segments[i]); + } + FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME)); + } + try + { + JobClient.runJob(job); + } + catch (IOException e) + { + LockUtil.removeLockFile(fs, lock); + throw e; + } + if (fs.exists(currentPageRankDb)) + { + if (LOG.isInfoEnabled()) + { + LOG.info("PageRankDb: merging with existing pageRankDb: " + pageRankDb); + } + // try to merge + Path newPageRankDb = FileOutputFormat.getOutputPath(job); + job = PageRankDbMerger.createMergeJob(getConf(), pageRankDb, normalize, filter); + FileInputFormat.addInputPath(job, currentPageRankDb); + FileInputFormat.addInputPath(job, newPageRankDb); + try + { + JobClient.runJob(job); + } + catch (IOException e) + { + LockUtil.removeLockFile(fs, lock); + fs.delete(newPageRankDb, true); + throw e; + } + fs.delete(newPageRankDb, true); + } + PageRankDb.install(job, pageRankDb); + if (LOG.isInfoEnabled()) + { LOG.info("PageRankDb: done"); } + } + + private static JobConf createJob(Configuration config, Path pageRankDb, boolean normalize, boolean filter) + { + Path newPageRankDb = new Path("pagerankdb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("pagerankdb " + pageRankDb); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(PageRankDb.class); + job.setCombinerClass(PageRankDbMerger.class); + // if we don't run the mergeJob, perform normalization/filtering now + if (normalize || filter) + { + try + { + FileSystem fs = FileSystem.get(config); + if (!fs.exists(pageRankDb)) + { + job.setBoolean(LinkDbFilter.URL_FILTERING, filter); + job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); + } + } + catch (Exception e) + { + LOG.warn("PageRankDb createJob: " + e); + } + } + job.setReducerClass(PageRankDbMerger.class); + + FileOutputFormat.setOutputPath(job, newPageRankDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", false); + job.setOutputKeyClass(Text.class); + + // DIFF: Use IntWritable instead of Inlinks as the output value type. + job.setOutputValueClass(IntWritable.class); + + return job; + } + + public static void install(JobConf job, Path pageRankDb) throws IOException + { + Path newPageRankDb = FileOutputFormat.getOutputPath(job); + FileSystem fs = new JobClient(job).getFs(); + Path old = new Path(pageRankDb, "old"); + Path current = new Path(pageRankDb, CURRENT_NAME); + if (fs.exists(current)) + { + if (fs.exists(old)) fs.delete(old, true); + fs.rename(current, old); + } + fs.mkdirs(pageRankDb); + fs.rename(newPageRankDb, current); + if (fs.exists(old)) fs.delete(old, true); + LockUtil.removeLockFile(fs, new Path(pageRankDb, LOCK_NAME)); + } + + public static void main(String[] args) throws Exception + { + int res = ToolRunner.run(NutchConfiguration.create(), new PageRankDb(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception + { + if (args.length < 2) + { + System.err.println("Usage: PageRankDb <pagerankdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-force] [-noNormalize] [-noFilter]"); + System.err.println("\tpagerankdb\toutput PageRankDb to create or update"); + System.err.println("\t-dir segmentsDir\tparent directory of several segments, OR"); + System.err.println("\tseg1 seg2 ...\t list of segment directories"); + System.err.println("\t-force\tforce update even if PageRankDb appears to be locked (CAUTION advised)"); + System.err.println("\t-noNormalize\tdon't normalize link URLs"); + System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs"); + return -1; + } + Path segDir = null; + final FileSystem fs = FileSystem.get(getConf()); + Path db = new Path(args[0]); + ArrayList<Path> segs = new ArrayList<Path>(); + boolean filter = true; + boolean normalize = true; + boolean force = false; + for (int i = 1; i < args.length; i++) + { + if (args[i].equals("-dir")) + { + segDir = new Path(args[++i]); + FileStatus[] files = fs.listStatus(segDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + if (files != null) segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(files))); + break; + } + else if (args[i].equalsIgnoreCase("-noNormalize")) + { + normalize = false; + } + else if (args[i].equalsIgnoreCase("-noFilter")) + { + filter = false; + } + else if (args[i].equalsIgnoreCase("-force")) + { + force = true; + } + else segs.add(new Path(args[i])); + } + try + { + invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, force); + return 0; + } + catch (Exception e) + { + LOG.fatal("PageRankDb: " + StringUtils.stringifyException(e)); + return -1; + } + } + +} Added: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbFilter.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbFilter.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbFilter.java 2009-02-23 03:54:47 UTC (rev 2683) @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.nutchwax; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; + +/** + * <p>This class provides a way to separate the URL normalization + * and filtering steps from the rest of LinkDb manipulation code.</p> + * <p>Aaron Binns @ archive.org: see comments in PageRankDbMerger.</p> + * + * @author Andrzej Bialecki + * @author Aaron Binns (archive.org) + */ +public class PageRankDbFilter implements Mapper<Text, IntWritable, Text, IntWritable> +{ + public static final String URL_FILTERING = "linkdb.url.filters"; + + public static final String URL_NORMALIZING = "linkdb.url.normalizer"; + + public static final String URL_NORMALIZING_SCOPE = "linkdb.url.normalizer.scope"; + + private boolean filter; + + private boolean normalize; + + private URLFilters filters; + + private URLNormalizers normalizers; + + private String scope; + + public static final Log LOG = LogFactory.getLog(PageRankDbFilter.class); + + private Text newKey = new Text(); + + public void configure(JobConf job) + { + filter = job.getBoolean(URL_FILTERING, false); + normalize = job.getBoolean(URL_NORMALIZING, false); + if (filter) + { + filters = new URLFilters(job); + } + if (normalize) + { + scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_LINKDB); + normalizers = new URLNormalizers(job, scope); + } + } + + public void close() + { + } + + public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException + { + String url = key.toString(); + // Inlinks result = new Inlinks(); + if (normalize) + { + try + { + url = normalizers.normalize(url, scope); // normalize the url + } + catch (Exception e) + { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url != null && filter) + { + try + { + url = filters.filter(url); // filter the url + } + catch (Exception e) + { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url == null) return; // didn't pass the filters + + // DIFF: Now that normalizers and filters have run, just emit the + // <url,value> pair. No processing to be done on the value. + Text newKey = new Text( url ); + output.collect( newKey, value ); + } +} Added: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbMerger.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbMerger.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/PageRankDbMerger.java 2009-02-23 03:54:47 UTC (rev 2683) @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.archive.nutchwax; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.LinkDbFilter; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** + * This tool merges several PageRankDb-s into one, optionally filtering + * URLs through the current URLFilters, to skip prohibited URLs and + * links. + * + * <p>It's possible to use this tool just for filtering - in that case + * only one PageRankDb should be specified in arguments.</p> + * <p>If more than one PageRankDb contains information about the same URL, + * all inlinks are accumulated, but only at most <code>db.max.inlinks</code> + * inlinks will ever be added.</p> + * <p>If activated, URLFilters will be applied to both the target URLs and + * to any incoming link URL. If a target URL is prohibited, all + * inlinks to that target will be removed, including the target URL. If + * some of incoming links are prohibited, only they will be removed, and they + * won't count when checking the above-mentioned maximum limit.</p> + * <p>Aaron Binns @ archive.org: + * <blockquote> + * Copy/paste/edit from LinkDbMerger. We only care about the inlink + * <em>count</em> not the inlinks themsevles. In fact, trying to + * retain the inlinks doesn't scale when processing 100s of millions + * of documents. In large part, due to fact that that Inlinks + * object wants to keep all of the inlinks in memory at once, + * i.e. in a Set. This doesn't work when we have 600 million + * documents and a single URL could easily have a million inlinks. + * </blockquote></p> + * + * @author Andrzej Bialecki + * @author Aaron Binns (archive.org) + */ +public class PageRankDbMerger extends Configured + implements Tool, Reducer<Text, IntWritable, Text, IntWritable> +{ + private static final Log LOG = LogFactory.getLog(PageRankDbMerger.class); + + private int maxInlinks; + + public PageRankDbMerger() + { + + } + + public PageRankDbMerger(Configuration conf) + { + setConf(conf); + } + + public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException + { + // DIFF: Simply sum the count values for the key. + int count = 0; + while ( values.hasNext( ) ) + { + count += values.next( ).get( ); + } + output.collect( key, new IntWritable( count ) ); + } + + public void configure(JobConf job) + { + maxInlinks = job.getInt("db.max.inlinks", 10000); + } + + public void close() throws IOException + { } + + public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) throws Exception + { + JobConf job = createMergeJob(getConf(), output, normalize, filter); + for (int i = 0; i < dbs.length; i++) + { + FileInputFormat.addInputPath(job, new Path(dbs[i], PageRankDb.CURRENT_NAME)); + } + JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + fs.mkdirs(output); + fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, PageRankDb.CURRENT_NAME)); + } + + public static JobConf createMergeJob(Configuration config, Path pageRankDb, boolean normalize, boolean filter) + { + Path newPageRankDb = + new Path("pagerankdb-merge-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("pagerankdb merge " + pageRankDb); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(PageRankDbFilter.class); + job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); + job.setBoolean(LinkDbFilter.URL_FILTERING, filter); + job.setReducerClass(PageRankDbMerger.class); + + FileOutputFormat.setOutputPath(job, newPageRankDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", true); + job.setOutputKeyClass(Text.class); + + // DIFF: Use IntWritable instead of Inlinks as the output value type. + job.setOutputValueClass(IntWritable.class); + + return job; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception + { + int res = ToolRunner.run(NutchConfiguration.create(), new PageRankDbMerger(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception + { + if (args.length < 2) + { + System.err.println("Usage: PageRankDbMerger <output_pagerankdb> <pagerankdb1> [<pagerankdb2> <pagerankdb3> ...] [-normalize] [-filter]"); + System.err.println("\toutput_pagerankdb\toutput PageRankDb"); + System.err.println("\tpagerankdb1 ...\tinput PageRankDb-s (single input PageRankDb is ok)"); + System.err.println("\t-normalize\tuse URLNormalizer on both fromUrls and toUrls in pagerankdb(s) (usually not needed)"); + System.err.println("\t-filter\tuse URLFilters on both fromUrls and toUrls in pagerankdb(s)"); + return -1; + } + Path output = new Path(args[0]); + ArrayList<Path> dbs = new ArrayList<Path>(); + boolean normalize = false; + boolean filter = false; + for (int i = 1; i < args.length; i++) + { + if (args[i].equals("-filter")) + { + filter = true; + } else if (args[i].equals("-normalize")) + { + normalize = true; + } else dbs.add(new Path(args[i])); + } + try + { + merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter); + return 0; + } + catch (Exception e) + { + LOG.fatal("PageRankDbMerger: " + StringUtils.stringifyException(e)); + return -1; + } + } + +} Modified: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/tools/PageRanker.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/tools/PageRanker.java 2009-02-10 22:19:48 UTC (rev 2682) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/tools/PageRanker.java 2009-02-23 03:54:47 UTC (rev 2683) @@ -133,8 +133,6 @@ return -1; } - PrintWriter output = new PrintWriter( new OutputStreamWriter( fs.create( outputPath ).getWrappedStream( ), "UTF-8" ) ); - if ( pos >= args.length ) { System.err.println( "Error: missing linkdb" ); @@ -155,11 +153,17 @@ } else { - FileStatus[] fstats = fs.listStatus( new Path(args[pos]+"/current"), HadoopFSUtil.getPassDirectoriesFilter(fs)); - mapfiles.addAll(Arrays.asList(HadoopFSUtil.getPaths(fstats))); + for ( ; pos < args.length ; pos++ ) + { + FileStatus[] fstats = fs.listStatus( new Path(args[pos]+"/current"), HadoopFSUtil.getPassDirectoriesFilter(fs)); + mapfiles.addAll(Arrays.asList(HadoopFSUtil.getPaths(fstats))); + } } System.out.println( "mapfiles = " + mapfiles ); + + PrintWriter output = new PrintWriter( new OutputStreamWriter( fs.create( outputPath ).getWrappedStream( ), "UTF-8" ) ); + try { for ( Path p : mapfiles ) @@ -171,24 +175,28 @@ while ( reader.next( key, value ) ) { - if ( key instanceof Text && value instanceof Inlinks ) + if ( ! (key instanceof Text) ) continue ; + + String toUrl = ((Text) key).toString( ); + + // HACK: Should make this into some externally configurable regex. + if ( ! toUrl.startsWith( "http" ) ) continue; + + int count = -1; + if ( value instanceof IntWritable ) { - Text toUrl = (Text) key; + count = ( (IntWritable) value ).get( ); + } + else if ( value instanceof Inlinks ) + { Inlinks inlinks = (Inlinks) value; - if ( inlinks.size( ) < threshold ) - { - continue ; - } + count = inlinks.size( ); + } + + if ( count < threshold ) continue ; - String toUrlString = toUrl.toString( ); - - // HACK: Should make this into some externally configurable regex. - if ( toUrlString.startsWith( "http" ) ) - { - output.println( inlinks.size( ) + " " + toUrl.toString() ); - } - } + output.println( count + " " + toUrl ); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bi...@us...> - 2009-10-28 00:02:45
|
Revision: 2846 http://archive-access.svn.sourceforge.net/archive-access/?rev=2846&view=rev Author: binzino Date: 2009-10-28 00:02:34 +0000 (Wed, 28 Oct 2009) Log Message: ----------- Ported from NW 0.12.9. Factored into two classes to match the refactoring that occurred in Nutch 1.0. Added Paths: ----------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Indexer.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/IndexerMapReduce.java Added: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Indexer.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Indexer.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/Indexer.java 2009-10-28 00:02:34 UTC (rev 2846) @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.nutchwax; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.indexer.lucene.LuceneWriter; +import org.apache.nutch.indexer.NutchIndexWriterFactory; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** Create indexes for segments. */ +public class Indexer extends Configured implements Tool { + + public static final String DONE_NAME = "index.done"; + + public static final Log LOG = LogFactory.getLog(Indexer.class); + + public Indexer() { + super(null); + } + + public Indexer(Configuration conf) { + super(conf); + } + + public void index(Path luceneDir, List<Path> segments) + throws IOException { + LOG.info("Indexer: starting"); + + final JobConf job = new NutchJob(getConf()); + job.setJobName("index-lucene " + luceneDir); + + IndexerMapReduce.initMRJob(segments, job); + + FileOutputFormat.setOutputPath(job, luceneDir); + + LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); + + NutchIndexWriterFactory.addClassToConf(job, LuceneWriter.class); + + JobClient.runJob(job); + LOG.info("Indexer: done"); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: Indexer <index> <segment> ..."); + return -1; + } + + final Path luceneDir = new Path(args[0]); + + final List<Path> segments = new ArrayList<Path>(); + for (int i = 1; i < args.length; i++) { + segments.add(new Path(args[i])); + } + + try { + index(luceneDir, segments); + return 0; + } catch (final Exception e) { + LOG.fatal("Indexer: " + StringUtils.stringifyException(e)); + return -1; + } + } + + public static void main(String[] args) throws Exception { + final int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args); + System.exit(res); + } +} Added: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/IndexerMapReduce.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/IndexerMapReduce.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/IndexerMapReduce.java 2009-10-28 00:02:34 UTC (rev 2846) @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.nutchwax; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.indexer.IndexerOutputFormat; +import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.IndexingFilters; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseText; + +public class IndexerMapReduce extends Configured +implements Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, Text, NutchDocument> { + + public static final Log LOG = LogFactory.getLog(IndexerMapReduce.class); + + private IndexingFilters filters; + + public void configure(JobConf job) { + setConf(job); + this.filters = new IndexingFilters(getConf()); + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException { + output.collect(key, new NutchWritable(value)); + } + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, NutchDocument> output, Reporter reporter) + throws IOException { + ParseData parseData = null; + ParseText parseText = null; + while (values.hasNext()) { + final Writable value = values.next().get(); // unwrap + + if (value instanceof ParseData) { + parseData = (ParseData)value; + } else if (value instanceof ParseText) { + parseText = (ParseText)value; + } else if (LOG.isWarnEnabled()) { + LOG.warn("Unrecognized type: "+value.getClass()); + } + } + + if ( parseText == null || parseData == null ) { + return; + } + + NutchDocument doc = new NutchDocument(); + final Metadata metadata = parseData.getContentMeta(); + + if ( metadata.get(Nutch.SEGMENT_NAME_KEY) == null || + metadata.get(Nutch.SIGNATURE_KEY) == null ) + { + LOG.warn( "Skipping document, insufficient metadata: key=" + key + " metadata=" + metadata ); + return ; + } + + // add segment, used to map from merged index back to segment files + doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY)); + + // add digest, used by dedup + doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); + + final Parse parse = new ParseImpl(parseText, parseData); + try { + // run indexing filters + doc = this.filters.filter(doc, parse, key, /*fetchDatum*/ null, /*inlinks*/ null); + } catch (final IndexingException e) { + if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); } + return; + } + + // skip documents discarded by indexing filters + if (doc == null) return; + + output.collect(key, doc); + } + + public void close() throws IOException { } + + public static void initMRJob(Collection<Path> segments, + JobConf job) { + + for (final Path segment : segments) { + LOG.info("IndexerMapReduces: adding segment: " + segment); + FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); + } + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(IndexerMapReduce.class); + job.setReducerClass(IndexerMapReduce.class); + + job.setOutputFormat(IndexerOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); + job.setOutputValueClass(NutchWritable.class); + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bi...@us...> - 2010-02-23 00:50:17
|
Revision: 2970 http://archive-access.svn.sourceforge.net/archive-access/?rev=2970&view=rev Author: binzino Date: 2010-02-23 00:50:11 +0000 (Tue, 23 Feb 2010) Log Message: ----------- Additional logging, especially for error conditions. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java Modified: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java 2010-02-23 00:25:39 UTC (rev 2969) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java 2010-02-23 00:50:11 UTC (rev 2970) @@ -27,6 +27,9 @@ import java.util.ArrayList; import java.util.LinkedList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.jdom.Document; import org.jdom.Element; import org.jdom.Namespace; @@ -38,8 +41,10 @@ */ public class OpenSearchMaster { + public static final Log LOG = LogFactory.getLog( OpenSearchMaster.class ); + List<OpenSearchSlave> slaves = new ArrayList<OpenSearchSlave>( ); - long timeout = 30 * 1000; + long timeout = 0; public OpenSearchMaster( String slavesFile, long timeout ) throws IOException @@ -102,22 +107,21 @@ { if ( sqt.throwable != null ) { - // TODO: Handle problems with slaves continue ; } - // Dump all the results ("item" elements) into a single list. - Element channel = sqt.response.getRootElement( ).getChild( "channel" ); - items.addAll( (List<Element>) channel.getChildren( "item" ) ); - channel.removeChildren( "item" ); - try { + // Dump all the results ("item" elements) into a single list. + Element channel = sqt.response.getRootElement( ).getChild( "channel" ); + items.addAll( (List<Element>) channel.getChildren( "item" ) ); + channel.removeChildren( "item" ); + totalResults += Integer.parseInt( channel.getChild( "totalResults", Namespace.getNamespace( "http://a9.com/-/spec/opensearchrss/1.0/" ) ).getTextTrim( ) ); } catch ( Exception e ) { - // TODO: Log error getting total. + LOG.error( "Error processing response from slave: " + sqt.slave, e ); } } @@ -146,10 +150,6 @@ collapsed.add( item ); count++; } - else - { - // TODO: Log collapse of item. - } } // Replace the list of items with the collapsed list. Modified: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java 2010-02-23 00:25:39 UTC (rev 2969) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java 2010-02-23 00:50:11 UTC (rev 2970) @@ -27,6 +27,9 @@ import java.net.URLEncoder; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.jdom.Document; import org.jdom.Element; import org.jdom.Namespace; @@ -38,6 +41,8 @@ */ public class OpenSearchSlave { + public static final Log LOG = LogFactory.getLog( OpenSearchSlave.class ); + private String urlTemplate; public OpenSearchSlave( String urlTemplate ) @@ -53,6 +58,8 @@ InputStream is = null; try { + LOG.info( "Querying slave: " + url ); + is = getInputStream( url ); Document doc = (new SAXBuilder()).build( is ); @@ -61,6 +68,11 @@ return doc; } + catch ( Exception e ) + { + LOG.error( url.toString(), e ); + throw e; + } finally { // Ensure the InputStream is closed, which should trigger the This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <bi...@us...> - 2010-03-16 21:28:28
|
Revision: 2971 http://archive-access.svn.sourceforge.net/archive-access/?rev=2971&view=rev Author: binzino Date: 2010-03-16 21:28:15 +0000 (Tue, 16 Mar 2010) Log Message: ----------- Removed from this release. Might make a re-appearance in a future release. Removed Paths: ------------- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMasterServlet.java trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java Deleted: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java 2010-02-23 00:50:11 UTC (rev 2970) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMaster.java 2010-03-16 21:28:15 UTC (rev 2971) @@ -1,355 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.archive.nutchwax; - -import java.io.IOException; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.FileInputStream; -import java.util.Comparator; -import java.util.Collections; -import java.util.List; -import java.util.ArrayList; -import java.util.LinkedList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.Namespace; -import org.jdom.output.XMLOutputter; - - -/** - * - */ -public class OpenSearchMaster -{ - public static final Log LOG = LogFactory.getLog( OpenSearchMaster.class ); - - List<OpenSearchSlave> slaves = new ArrayList<OpenSearchSlave>( ); - long timeout = 0; - - public OpenSearchMaster( String slavesFile, long timeout ) - throws IOException - { - this( slavesFile ); - this.timeout = timeout; - } - - public OpenSearchMaster( String slavesFile ) - throws IOException - { - BufferedReader r = null; - try - { - r = new BufferedReader( new InputStreamReader( new FileInputStream( slavesFile ), "utf-8" ) ); - - String line; - while ( (line = r.readLine()) != null ) - { - line = line.trim(); - if ( line.length() == 0 || line.charAt( 0 ) == '#' ) - { - // Ignore it. - continue ; - } - - OpenSearchSlave slave = new OpenSearchSlave( line ); - - this.slaves.add( slave ); - } - } - finally - { - try { if ( r != null ) r.close(); } catch ( IOException ioe ) { } - } - - } - - public Document query( String query, int startIndex, int numResults, int hitsPerSite ) - { - long startTime = System.currentTimeMillis( ); - - List<SlaveQueryThread> slaveThreads = new ArrayList<SlaveQueryThread>( this.slaves.size() ); - - for ( OpenSearchSlave slave : this.slaves ) - { - SlaveQueryThread sqt = new SlaveQueryThread( slave, query, 0, (startIndex+numResults), hitsPerSite ); - - sqt.start( ); - - slaveThreads.add( sqt ); - } - - waitForThreads( slaveThreads, this.timeout ); - - LinkedList<Element> items = new LinkedList<Element>( ); - long totalResults = 0; - - for ( SlaveQueryThread sqt : slaveThreads ) - { - if ( sqt.throwable != null ) - { - continue ; - } - - try - { - // Dump all the results ("item" elements) into a single list. - Element channel = sqt.response.getRootElement( ).getChild( "channel" ); - items.addAll( (List<Element>) channel.getChildren( "item" ) ); - channel.removeChildren( "item" ); - - totalResults += Integer.parseInt( channel.getChild( "totalResults", Namespace.getNamespace( "http://a9.com/-/spec/opensearchrss/1.0/" ) ).getTextTrim( ) ); - } - catch ( Exception e ) - { - LOG.error( "Error processing response from slave: " + sqt.slave, e ); - } - - } - - if ( items.size( ) > 0 && hitsPerSite > 0 ) - { - Collections.sort( items, new ElementSiteThenScoreComparator( ) ); - - LinkedList<Element> collapsed = new LinkedList<Element>( ); - - collapsed.add( items.removeFirst( ) ); - - int count = 1; - for ( Element item : items ) - { - String lastSite = collapsed.getLast( ).getChild( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ).getTextTrim( ); - - if ( lastSite.length( ) == 0 || - !lastSite.equals( item.getChild( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ).getTextTrim( ) ) ) - { - collapsed.add( item ); - count = 1; - } - else if ( count < hitsPerSite ) - { - collapsed.add( item ); - count++; - } - } - - // Replace the list of items with the collapsed list. - items = collapsed; - } - - Collections.sort( items, new ElementScoreComparator( ) ); - - // Build the final results OpenSearch XML document. - Element channel = new Element( "channel" ); - channel.addContent( new Element( "title" ) ); - channel.addContent( new Element( "description" ) ); - channel.addContent( new Element( "link" ) ); - - Element eTotalResults = new Element( "totalResults", Namespace.getNamespace( "http://a9.com/-/spec/opensearchrss/1.0/" ) ); - Element eStartIndex = new Element( "startIndex", Namespace.getNamespace( "http://a9.com/-/spec/opensearchrss/1.0/" ) ); - Element eItemsPerPage = new Element( "itemsPerPage", Namespace.getNamespace( "http://a9.com/-/spec/opensearchrss/1.0/" ) ); - - eTotalResults.setText( Long.toString( totalResults ) ); - eStartIndex. setText( Long.toString( startIndex ) ); - eItemsPerPage.setText( Long.toString( numResults ) ); - - channel.addContent( eTotalResults ); - channel.addContent( eStartIndex ); - channel.addContent( eItemsPerPage ); - - // Get a sub-list of only the items we want: [startIndex,(startIndex+numResults)] - List<Element> subList = items.subList( Math.min( startIndex, items.size( ) ), - Math.min( (startIndex+numResults), items.size( ) ) ); - channel.addContent( subList ); - - Element rss = new Element( "rss" ); - rss.addContent( channel ); - - return new Document( rss ); - } - - - /** - * Convenience method to wait for a collection of threads to complete, - * or until a timeout after a startTime expires. - */ - private void waitForThreads( List<SlaveQueryThread> threads, long timeout ) - { - for ( Thread t : threads ) - { - try - { - t.join( timeout ); - } - catch ( InterruptedException ie ) - { - break; - } - } - } - - - public static void main( String args[] ) - throws Exception - { - String usage = "OpenSearchMaster [OPTIONS] SLAVES.txt query" - + "\n\t-h <n> Hits per site" - + "\n\t-n <n> Number of results" - + "\n\t-s <n> Start index" - + "\n"; - - if ( args.length < 2 ) - { - System.err.println( usage ); - System.exit( 1 ); - } - - String slavesFile = args[args.length - 2]; - String query = args[args.length - 1]; - - int startIndex = 0; - int hitsPerSite = 0; - int numHits = 10; - for ( int i = 0 ; i < args.length - 2 ; i++ ) - { - try - { - if ( "-h".equals( args[i] ) ) - { - i++; - hitsPerSite = Integer.parseInt( args[i] ); - } - if ( "-n".equals( args[i] ) ) - { - i++; - numHits = Integer.parseInt( args[i] ); - } - if ( "-s".equals( args[i] ) ) - { - i++; - startIndex = Integer.parseInt( args[i] ); - } - } - catch ( NumberFormatException nfe ) - { - System.err.println( "Error: not a numeric value: " + args[i] ); - System.err.println( usage ); - System.exit( 1 ); - } - } - - OpenSearchMaster master = new OpenSearchMaster( slavesFile ); - - Document doc = master.query( query, startIndex, numHits, hitsPerSite ); - - (new XMLOutputter()).output( doc, System.out ); - } - -} - - -class SlaveQueryThread extends Thread -{ - OpenSearchSlave slave; - - String query; - int startIndex; - int numResults; - int hitsPerSite; - - Document response; - Throwable throwable; - - - SlaveQueryThread( OpenSearchSlave slave, String query, int startIndex, int numResults, int hitsPerSite ) - { - this.slave = slave; - this.query = query; - this.startIndex = startIndex; - this.numResults = numResults; - this.hitsPerSite = hitsPerSite; - } - - public void run( ) - { - try - { - this.response = this.slave.query( this.query, this.startIndex, this.numResults, this.hitsPerSite ); - } - catch ( Throwable t ) - { - this.throwable = t; - } - } -} - - -class ElementScoreComparator implements Comparator<Element> -{ - public int compare( Element e1, Element e2 ) - { - if ( e1 == e2 ) return 0; - if ( e1 == null ) return 1; - if ( e2 == null ) return -1; - - Element score1 = e1.getChild( "score", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - Element score2 = e2.getChild( "score", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - - if ( score1 == score2 ) return 0; - if ( score1 == null ) return 1; - if ( score2 == null ) return -1; - - String text1 = score1.getText().trim(); - String text2 = score2.getText().trim(); - - float value1 = 0.0f; - float value2 = 0.0f; - - try { value1 = Float.parseFloat( text1 ); } catch ( NumberFormatException nfe ) { } - try { value2 = Float.parseFloat( text2 ); } catch ( NumberFormatException nfe ) { } - - if ( value1 == value2 ) return 0; - - return value1 > value2 ? -1 : 1; - } -} - -class ElementSiteThenScoreComparator extends ElementScoreComparator -{ - public int compare( Element e1, Element e2 ) - { - if ( e1 == e2 ) return 0; - if ( e1 == null ) return 1; - if ( e2 == null ) return -1; - - String site1 = e1.getChild( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ).getTextTrim(); - String site2 = e2.getChild( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ).getTextTrim(); - - if ( site1.equals( site2 ) ) - { - // Sites are equal, then compare scores. - return super.compare( e1, e2 ); - } - - return site1.compareTo( site2 ); - } -} \ No newline at end of file Deleted: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMasterServlet.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMasterServlet.java 2010-02-23 00:50:11 UTC (rev 2970) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchMasterServlet.java 2010-03-16 21:28:15 UTC (rev 2971) @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.archive.nutchwax; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import javax.servlet.ServletConfig; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.Namespace; -import org.jdom.output.XMLOutputter; - -/** - * - */ -public class OpenSearchMasterServlet extends HttpServlet -{ - OpenSearchMaster master; - - int hitsPerSite = 0; - - public void init( ServletConfig config ) - throws ServletException - { - String slavesFile = config.getInitParameter( "slaves" ); - - if ( slavesFile == null || slavesFile.trim().length() == 0 ) - { - throw new ServletException( "Required init parameter missing: slaves" ); - } - - int timeout = getInteger( config.getInitParameter( "timeout" ), 0 ); - int hitsPerSite = getInteger( config.getInitParameter( "hitsPerSite" ), 0 ); - - try - { - this.master = new OpenSearchMaster( slavesFile, timeout ); - } - catch ( IOException ioe ) - { - throw new ServletException( ioe ); - } - - } - - public void destroy( ) - { - - } - - public void doGet( HttpServletRequest request, HttpServletResponse response ) - throws ServletException, IOException - { - long responseTime = System.nanoTime( ); - - request.setCharacterEncoding( "UTF-8" ); - - String query = getString ( request.getParameter( "query" ), "" ); - int startIndex = getInteger( request.getParameter( "start" ), 0 ); - int numHits = getInteger( request.getParameter( "hitsPerPage" ), 10 ); - int hitsPerSite = getInteger( request.getParameter( "hitsPerSite" ), this.hitsPerSite ); - - Document doc = this.master.query( query, startIndex, numHits, hitsPerSite ); - - Element eUrlParams = new Element( "urlParams", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - - for ( Map.Entry<String,String[]> e : ((Map<String,String[]>) request.getParameterMap( )).entrySet( ) ) - { - String key = e.getKey( ); - for ( String value : e.getValue( ) ) - { - Element eParam = new Element( "param", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - eParam.setAttribute( "name", key ); - eParam.setAttribute( "value", value ); - eUrlParams.addContent( eParam ); - } - } - - doc.getRootElement( ).getChild( "channel" ).addContent( eUrlParams ); - - (new XMLOutputter()).output( doc, response.getOutputStream( ) ); - } - - String getString ( String value, String defaultValue ) - { - if ( value != null ) - { - value = value.trim(); - - if ( value.length( ) != 0 ) - { - return value; - } - } - - return defaultValue; - } - - int getInteger( String value, int defaultValue ) - { - if ( value != null ) - { - value = value.trim(); - - if ( value.length( ) != 0 ) - { - try - { - int i = Integer.parseInt( value ); - - return i; - } - catch ( NumberFormatException nfe ) - { - // TODO: log? - } - } - } - - return defaultValue; - } - -} Deleted: trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java 2010-02-23 00:50:11 UTC (rev 2970) +++ trunk/archive-access/projects/nutchwax/archive/src/java/org/archive/nutchwax/OpenSearchSlave.java 2010-03-16 21:28:15 UTC (rev 2971) @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.archive.nutchwax; - -import java.io.IOException; -import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.net.URLEncoder; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.Namespace; -import org.jdom.input.SAXBuilder; -import org.jdom.output.XMLOutputter; - -/** - * - */ -public class OpenSearchSlave -{ - public static final Log LOG = LogFactory.getLog( OpenSearchSlave.class ); - - private String urlTemplate; - - public OpenSearchSlave( String urlTemplate ) - { - this.urlTemplate = urlTemplate; - } - - public Document query( String query, int startIndex, int requestedNumResults, int hitsPerSite ) - throws Exception - { - URL url = buildRequestUrl( query, startIndex, requestedNumResults, hitsPerSite ); - - InputStream is = null; - try - { - LOG.info( "Querying slave: " + url ); - - is = getInputStream( url ); - - Document doc = (new SAXBuilder()).build( is ); - - doc = validate( doc ); - - return doc; - } - catch ( Exception e ) - { - LOG.error( url.toString(), e ); - throw e; - } - finally - { - // Ensure the InputStream is closed, which should trigger the - // underlying HTTP connection to be cleaned-up. - try { if ( is != null ) is.close( ); } catch ( IOException ioe ) { } // Not much we can do - } - } - - private Document validate( Document doc ) - throws Exception - { - if ( doc.getRootElement( ) == null ) throw new Exception( "Invalid OpenSearch response: missing /rss" ); - Element root = doc.getRootElement( ); - - if ( ! "rss".equals( root.getName( ) ) ) throw new Exception( "Invalid OpenSearch response: missing /rss" ); - Element channel = root.getChild( "channel" ); - - if ( channel == null ) throw new Exception( "Invalid OpenSearch response: missing /rss/channel" ); - - for ( Element item : (List<Element>) channel.getChildren( "item" ) ) - { - Element site = item.getChild( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - if ( site == null ) - { - item.addContent( new Element( "site", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ) ); - } - - Element score = item.getChild( "score", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ); - if ( score == null ) - { - item.addContent( new Element( "score", Namespace.getNamespace( "http://www.nutch.org/opensearchrss/1.0/" ) ) ); - } - } - - return doc; - } - - /** - * - */ - public URL buildRequestUrl( String query, int startIndex, int requestedNumResults, int hitsPerSite ) - throws MalformedURLException, UnsupportedEncodingException - { - String url = this.urlTemplate; - - // Note about replaceAll: In the Java regex library, the replacement string has a few - // special characters: \ and $. Forunately, since we URL-encode the replacement string, - // any occurance of \ or $ is converted to %xy form. So we don't have to worry about it. :) - url = url.replaceAll( "[{]searchTerms[}]", URLEncoder.encode( query, "utf-8" ) ); - url = url.replaceAll( "[{]count[}]" , String.valueOf( requestedNumResults ) ); - url = url.replaceAll( "[{]startIndex[}]" , String.valueOf( startIndex ) ); - url = url.replaceAll( "[{]hitsPerSite[}]", String.valueOf( hitsPerSite ) ); - - // We don't know about any optional parameters, so we remove them (per the OpenSearch spec.) - url = url.replaceAll( "[{][^}]+[?][}]", "" ); - - return new URL( url ); - } - - - public InputStream getInputStream( URL url ) - throws IOException - { - URLConnection connection = url.openConnection( ); - connection.setDoOutput( false ); - connection.setRequestProperty( "User-Agent", "Mozilla/4.0 (compatible; NutchWAX OpenSearchMaster)" ); - connection.connect( ); - - if ( connection instanceof HttpURLConnection ) - { - HttpURLConnection hc = (HttpURLConnection) connection; - - switch ( hc.getResponseCode( ) ) - { - case 200: - // All good. - break; - default: - // Problems! Bail out. - throw new IOException( "HTTP error from " + url + ": " + hc.getResponseMessage( ) ); - } - } - - InputStream is = connection.getInputStream( ); - - return is; - } - - public String toString() - { - return this.urlTemplate; - } - - public static void main( String args[] ) - throws Exception - { - String usage = "OpenSearchSlave [OPTIONS] urlTemplate query" - + "\n\t-h <n> Hits per site" - + "\n\t-n <n> Number of results" - + "\n"; - - if ( args.length < 2 ) - { - System.err.println( usage ); - System.exit( 1 ); - } - - String urlTemplate = args[args.length - 2]; - String query = args[args.length - 1]; - - int hitsPerSite = 0; - int numHits = 10; - for ( int i = 0 ; i < args.length - 2 ; i++ ) - { - try - { - if ( "-h".equals( args[i] ) ) - { - i++; - hitsPerSite = Integer.parseInt( args[i] ); - } - if ( "-n".equals( args[i] ) ) - { - i++; - numHits = Integer.parseInt( args[i] ); - } - } - catch ( NumberFormatException nfe ) - { - System.err.println( "Error: not a numeric value: " + args[i] ); - System.err.println( usage ); - System.exit( 1 ); - } - } - - OpenSearchSlave osl = new OpenSearchSlave( urlTemplate ); - - Document doc = osl.query( query, 0, numHits, hitsPerSite ); - - (new XMLOutputter()).output( doc, System.out ); - } - -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |