From: <bi...@us...> - 2010-03-24 01:09:00
|
Revision: 3005 http://archive-access.svn.sourceforge.net/archive-access/?rev=3005&view=rev Author: binzino Date: 2010-03-24 01:08:53 +0000 (Wed, 24 Mar 2010) Log Message: ----------- Various hacks for ARI-2260. Modified Paths: -------------- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSearch.java tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/IndexSearcher.java Added Paths: ----------- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/CollapsingHitCollector.java tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/Searcher.java Added: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/CollapsingHitCollector.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/CollapsingHitCollector.java (rev 0) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/CollapsingHitCollector.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -0,0 +1,246 @@ +package org.apache.nutch.searcher; + +import java.io.*; +import java.util.*; + +import org.apache.lucene.search.TopDocCollector; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; + +public class CollapsingHitCollector extends TopDocCollector +{ + public static final Comparator<Hit> SCORE_COMPARATOR = new Comparator<Hit>( ) + { + public int compare( Hit h1, Hit h2 ) + { + if ( h1.score < h2.score ) return -1; + if ( h1.score > h2.score ) return 1; + + // must be equal + return 0; + } + }; + + public static final Comparator<Hit> SITE_COMPARATOR_PARTIAL = new Comparator<Hit>( ) + { + public int compare( Hit h1, Hit h2 ) + { + return String.CASE_INSENSITIVE_ORDER.compare( h1.site, h2.site ); + } + }; + + public static final Comparator<Hit> SITE_COMPARATOR_TOTAL = new Comparator<Hit>( ) + { + public int compare( Hit h1, Hit h2 ) + { + return String.CASE_INSENSITIVE_ORDER.compare( h1.site, h2.site ); + } + }; + + public static class Hit + { + int id; + float score; + String site; + + public Hit( int id, float score, String site ) + { + this.id = id; + this.score = score; + this.site = site; + } + + public int compareTo( Hit that ) + { + if ( this.score < that.score ) return -1; + if ( this.score > that.score ) return 1; + + if ( this.id < that.id ) return -1; + if ( this.id > that.id ) return 1; + + return 0; + } + } + + public org.apache.lucene.search.Searcher searcher; + public int numHits; + public int hitsPerSite; + + final Hit[] sortedByScore; + final Hit[] sortedBySite; + final Hit candidate; + + int numUncollapsedHits = 0; + + public CollapsingHitCollector( org.apache.lucene.search.Searcher searcher, int numHits, int hitsPerSite ) + { + super( numHits ); + this.searcher = searcher; + this.numHits = numHits; + this.hitsPerSite = hitsPerSite; + + this.sortedByScore = new Hit[numHits]; + this.sortedBySite = new Hit[numHits]; + + for ( int i = 0; i < numHits; i++ ) + { + Hit sd = new Hit( -1, Float.NEGATIVE_INFINITY, "" ); + this.sortedByScore[i] = sd; + this.sortedBySite [i] = sd; + } + + this.candidate = new Hit( -1, Float.NEGATIVE_INFINITY, "" ); + + } + + public void collect( int doc, float score ) + { + this.numUncollapsedHits++; + + this.candidate.id = doc; + this.candidate.score = score; + + if ( this.candidate.score <= this.sortedByScore[0].score ) + { + return ; + } + + try + { + String url = this.searcher.doc( this.candidate.id ).get( "url"); + try + { + java.net.URL u = new java.net.URL( url ); + + this.candidate.site = u.getHost(); + } + catch ( java.net.MalformedURLException e ) { } + } + catch ( IOException ioe ) { throw new RuntimeException( ioe ); } + + // Use "" rather than null to keep searching and sorting simple. + if ( this.candidate.site == null ) this.candidate.site = ""; + + int sitePos = findReplacementPosition( candidate ); + + // No existing hit to be replaced, so we replace the overall + // lowest-scoring one, which is always in position 0 in the + // sortedByScore list. + if ( sitePos < 0 ) + { + this.sortedByScore[0].id = candidate.id; + this.sortedByScore[0].score = candidate.score; + this.sortedByScore[0].site = candidate.site; + + // Since we just added a new site, re-sort them. + Arrays.sort( this.sortedByScore, SCORE_COMPARATOR ); + + // No need to re-sort the sites if not collapsing. + if ( this.hitsPerSite != 0 ) + { + Arrays.sort( this.sortedBySite, SITE_COMPARATOR_TOTAL ); + } + + // Done! + return ; + } + + // We have an existing Hit from the same site which can be + // replaced *if* the candidate's score is better. + if ( candidate.score > this.sortedBySite[sitePos].score ) + { + this.sortedBySite[sitePos].id = this.candidate.id; + this.sortedBySite[sitePos].score = this.candidate.score; + + // We have to re-sort by scores. + Arrays.sort( this.sortedByScore, SCORE_COMPARATOR ); + + // If our hitsPerSite > 1, then we have to re-sort by site to + // ensure that the hit we just inserted is put into the proper + // sorted position within the site group. If hitsPerSite==1, + // then the group size == 1 and therefore no need to re-sort. + if ( this.hitsPerSite > 1 ) + { + Arrays.sort( this.sortedBySite, SITE_COMPARATOR_TOTAL ); + } + } + } + + private int findReplacementPosition( Hit candidate ) + { + if ( this.hitsPerSite == 0 ) return -1; + + int pos = Arrays.binarySearch( this.sortedBySite, candidate, SITE_COMPARATOR_PARTIAL ); + + if ( pos < 0 || this.hitsPerSite == 1 ) return pos; + + int i = pos, j = pos; + + final int mini = 0, maxj = this.sortedBySite.length - 1; + + for ( ; i > mini && SITE_COMPARATOR_PARTIAL.compare( this.sortedBySite[i], this.sortedBySite[i-1] ) == 0; i-- ) + ; + + for ( ; j < maxj && SITE_COMPARATOR_PARTIAL.compare( this.sortedBySite[i], this.sortedBySite[j+1] ) == 0; j++ ) + ; + + // The number of hits from this site is (j-i+1), so if we are less + // than the max number of hits per site, then we return -1 to + // indicate there is still room for more Hits from the candidate + // site. + if ( (j - i + 1) < this.hitsPerSite ) return -1; + + // Otherwise, the Hit to be potentially replaced is the lowest + // scoring hit, which is the one at position i. + return i; + } + + public Hit[] getHits() + { + Hit[] hits = new Hit[this.getNumHits( )]; + + final int sortedByScoreEndPos = this.sortedByScore.length - 1; + for ( int i = 0; i < hits.length ; i++ ) + { + hits[i] = this.sortedByScore[ sortedByScoreEndPos - i ]; + } + + return hits; + } + + public int getNumHits( ) + { + for ( int i = this.sortedByScore.length - this.numHits ; i < this.sortedByScore.length ; i++ ) + { + if ( this.sortedByScore[i].score != Float.NEGATIVE_INFINITY ) + { + return this.sortedByScore.length - i; + } + } + return 0; + } + + public int getTotalHits() + { + if ( this.hitsPerSite == 0 ) return this.numUncollapsedHits; + + int numCollapsedHits = getNumHits( ); + + if ( numCollapsedHits < this.numHits ) + { + return numCollapsedHits; + } + return this.numUncollapsedHits; + } + + public TopDocs topDocs() + { + Hit[] hits = this.getHits( ); + ScoreDoc[] sd = new ScoreDoc[hits.length]; + for ( int i = 0 ; i < hits.length ; i++ ) + { + sd[i] = new ScoreDoc( hits[i].id, hits[i].score ); + } + return new TopDocs( getTotalHits(), sd, hits[hits.length-1].score ); + } +} \ No newline at end of file Modified: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSearch.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSearch.java 2010-03-24 01:01:04 UTC (rev 3004) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/DistributedSearch.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -254,7 +254,9 @@ } public Hits search(final Query query, final int numHits, - final String dedupField, final String sortField, + final int maxHitsPerDup, + final String dedupField, + final String sortField, final boolean reverse) throws IOException { // Get the list of live servers. It would be nice to build this // list in updateSegments(), but that would create concurrency issues. @@ -282,8 +284,9 @@ params[i][0] = query; params[i][1] = new Integer(numHits); params[i][2] = dedupField; - params[i][3] = sortField; - params[i][4] = Boolean.valueOf(reverse); + params[i][3] = maxHitsPerDup; + params[i][4] = sortField; + params[i][5] = Boolean.valueOf(reverse); } Hits[] results = (Hits[])RPC.call(SEARCH, params, liveAddresses, this.conf); @@ -439,7 +442,7 @@ Client client = new Client(addresses, NutchConfiguration.create()); //client.setTimeout(Integer.MAX_VALUE); - Hits hits = client.search(query, 10, null, null, false); + Hits hits = client.search(query, 10, 0, null, null, false); System.out.println("Total hits: " + hits.getTotal()); for (int i = 0; i < hits.getLength(); i++) { System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i))); Modified: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/IndexSearcher.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/IndexSearcher.java 2010-03-24 01:01:04 UTC (rev 3004) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/IndexSearcher.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -90,7 +90,9 @@ } public Hits search(Query query, int numHits, - String dedupField, String sortField, boolean reverse) + int maxHitsPerDup, + String dedupField, + String sortField, boolean reverse) throws IOException { org.apache.lucene.search.BooleanQuery luceneQuery = @@ -100,7 +102,7 @@ System.out.println( "Lucene query: " + luceneQuery ); return translateHits - (optimizer.optimize(luceneQuery, luceneSearcher, numHits, + (optimizer.optimize(luceneQuery, luceneSearcher, numHits, maxHitsPerDup, sortField, reverse), dedupField, sortField); } Added: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (rev 0) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import org.apache.lucene.search.Searcher; +import org.apache.lucene.search.QueryFilter; +import org.apache.lucene.search.*; +import org.apache.lucene.index.Term; +import org.apache.lucene.misc.ChainedFilter; + +import org.apache.hadoop.conf.Configuration; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.ArrayList; + +import java.io.IOException; + +/** Utility which converts certain query clauses into {@link QueryFilter}s and + * caches these. Only required clauses whose boost is zero are converted to + * cached filters. Range queries are converted to range filters. This + * accellerates query constraints like date, language, document format, etc., + * which do not affect ranking but might otherwise slow search considerably. */ +class LuceneQueryOptimizer { + + // This thread provides a pseudo-clock service to all searching + // threads, so that they can count elapsed time with less overhead than + // repeatedly calling System.currentTimeMillis. + private TimerThread timerThread = null; + + private static class TimerThread extends Thread { + private int tick; + // NOTE: we can avoid explicit synchronization here for several reasons: + // * updates to 32-bit-sized variables are atomic + // * only single thread modifies this value + // * use of volatile keyword ensures that it does not reside in + // a register, but in main memory (so that changes are visible to + // other threads). + // * visibility of changes does not need to be instantanous, we can + // afford losing a tick or two. + // + // See section 17 of the Java Language Specification for details. + public volatile int timeCounter = 0; + + boolean running = true; + + public TimerThread(int tick) { + super("LQO timer thread"); + this.tick = tick; + this.setDaemon(true); + } + + public void run() { + while(running) { + timeCounter++; + try { + Thread.sleep(tick); + } catch (InterruptedException ie) {}; + } + } + } + + private void initTimerThread(int p) { + if (timerThread == null || !timerThread.isAlive()) { + timerThread = new TimerThread(p); + timerThread.start(); + } + } + + + private static class TimeExceeded extends RuntimeException { + public long maxTime; + private int maxDoc; + public TimeExceeded(long maxTime, int maxDoc) { + super("Exceeded search time: " + maxTime + " ms."); + this.maxTime = maxTime; + this.maxDoc = maxDoc; + } + } + + private static class LimitedCollector extends TopDocCollector { + private int maxHits; + private int maxTicks; + private int startTicks; + private TimerThread timer; + private int curTicks; + + public LimitedCollector(int numHits, int maxHits, int maxTicks, + TimerThread timer) { + super(numHits); + this.maxHits = maxHits; + this.maxTicks = maxTicks; + if (timer != null) { + this.timer = timer; + this.startTicks = timer.timeCounter; + } + } + + public void collect(int doc, float score) { + if (maxHits > 0 && getTotalHits() >= maxHits) { + throw new LimitExceeded(doc); + } + if (timer != null) { + curTicks = timer.timeCounter; + // overflow check + if (curTicks < startTicks) curTicks += Integer.MAX_VALUE; + if (curTicks - startTicks > maxTicks) { + throw new TimeExceeded(timer.tick * (curTicks - startTicks), doc); + } + } + super.collect(doc, score); + } + } + + private static class LimitExceeded extends RuntimeException { + private int maxDoc; + public LimitExceeded(int maxDoc) { this.maxDoc = maxDoc; } + } + + private LinkedHashMap<BooleanQuery, Filter> cache; // an LRU cache of QueryFilter + + private float threshold; + + private int searcherMaxHits; + + private int tickLength; + + private int maxTickCount; + + /** + * Construct an optimizer that caches and uses filters for required clauses + * whose boost is zero. + * + * @param cacheSize + * the number of QueryFilters to cache + * @param threshold + * the fraction of documents which must contain a term + */ + public LuceneQueryOptimizer(Configuration conf) { + final int cacheSize = conf.getInt("searcher.filter.cache.size", 16); + this.threshold = conf.getFloat("searcher.filter.cache.threshold", + 0.05f); + this.searcherMaxHits = conf.getInt("searcher.max.hits", -1); + this.cache = new LinkedHashMap<BooleanQuery, Filter>(cacheSize, 0.75f, true) { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > cacheSize; // limit size of cache + } + }; + this.tickLength = conf.getInt("searcher.max.time.tick_length", 200); + this.maxTickCount = conf.getInt("searcher.max.time.tick_count", -1); + if (this.maxTickCount > 0) { + initTimerThread(this.tickLength); + } + } + + public TopDocs optimize(BooleanQuery original, + Searcher searcher, int numHits, int maxHitsPerDup, + String sortField, boolean reverse) + throws IOException { + + BooleanQuery query = new BooleanQuery(); + BooleanQuery cacheQuery = new BooleanQuery(); + BooleanQuery filterQuery = new BooleanQuery(); + ArrayList<Filter> filters = new ArrayList<Filter>(); + + BooleanClause[] clauses = original.getClauses(); + for (int i = 0; i < clauses.length; i++) { + BooleanClause c = clauses[i]; + if (c.isRequired() // required + && c.getQuery().getBoost() == 0.0f) { // boost is zero + + if (c.getQuery() instanceof TermQuery // TermQuery + && (searcher.docFreq(((TermQuery)c.getQuery()).getTerm()) + / (float)searcher.maxDoc()) < threshold) { // beneath threshold + query.add(c); // don't filterize + continue; + } + + if (c.getQuery() instanceof RangeQuery) { // RangeQuery + RangeQuery range = (RangeQuery)c.getQuery(); + boolean inclusive = range.isInclusive();// convert to RangeFilter + Term lower = range.getLowerTerm(); + Term upper = range.getUpperTerm(); + filters.add(new RangeFilter(lower!=null?lower.field():upper.field(), + lower != null ? lower.text() : null, + upper != null ? upper.text() : null, + inclusive, inclusive)); + cacheQuery.add(c.getQuery(), BooleanClause.Occur.MUST); // cache it + continue; + } + + // all other query types + filterQuery.add(c.getQuery(), BooleanClause.Occur.MUST); // filter it + cacheQuery.add(c.getQuery(), BooleanClause.Occur.MUST); // cache it + continue; + } + + query.add(c); // query it + } + + Filter filter = null; + if (cacheQuery.getClauses().length != 0) { + synchronized (cache) { // check cache + filter = cache.get(cacheQuery); + } + if (filter == null) { // miss + + if (filterQuery.getClauses().length != 0) // add filterQuery to filters + filters.add(new CachingWrapperFilter(new QueryWrapperFilter(filterQuery))); + + if (filters.size() == 1) { // convert filters to filter + filter = (Filter)filters.get(0); + } else { + filter = new ChainedFilter((Filter[])filters.toArray + (new Filter[filters.size()]), + ChainedFilter.AND); + } + if (!(filter instanceof CachingWrapperFilter)) // make sure bits are cached + filter = new CachingWrapperFilter(filter); + + synchronized (cache) { + cache.put(cacheQuery, filter); // cache the filter + } + } + } + if (sortField == null && !reverse) { + + // no hit limit + if (this.searcherMaxHits <= 0 && timerThread == null) { + // FIXME: Need hitsPerSite value, using '1' to test. + TopDocCollector c = new CollapsingHitCollector( searcher, numHits, maxHitsPerDup ); + searcher.search(query, filter, c ); + return c.topDocs( ); + } + + // hits limited in time or in count -- use a LimitedCollector + LimitedCollector collector = new LimitedCollector(numHits, searcherMaxHits, + maxTickCount, timerThread); + LimitExceeded exceeded = null; + TimeExceeded timeExceeded = null; + try { + searcher.search(query, filter, collector); + } catch (LimitExceeded le) { + exceeded = le; + } catch (TimeExceeded te) { + timeExceeded = te; + } + TopDocs results = collector.topDocs(); + if (exceeded != null) { // limit was exceeded + results.totalHits = (int) // must estimate totalHits + (results.totalHits*(searcher.maxDoc()/(float)exceeded.maxDoc)); + } else if (timeExceeded != null) { + // Estimate total hits. + results.totalHits = (int)(results.totalHits * (searcher.maxDoc()/(float)timeExceeded.maxDoc)); + } + return results; + + } else { + return searcher.search(query, filter, numHits, + new Sort(sortField, reverse)); + } + } +} Added: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java (rev 0) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -0,0 +1,434 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.*; +import java.util.*; +import javax.servlet.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.parse.*; +import org.apache.nutch.indexer.*; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.NutchConfiguration; + +/** + * One stop shopping for search-related functionality. + * @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $ + */ +public class NutchBean + implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, + DistributedSearch.Protocol, Closeable { + + public static final Log LOG = LogFactory.getLog(NutchBean.class); + public static final String KEY = "nutchBean"; + +// static { +// LogFormatter.setShowThreadIDs(true); +// } + + private String[] segmentNames; + + private Searcher searcher; + private HitDetailer detailer; + private HitSummarizer summarizer; + private HitContent content; + private HitInlinks linkDb; + + + /** BooleanQuery won't permit more than 32 required/prohibited clauses. We + * don't want to use too many of those. */ + private static final int MAX_PROHIBITED_TERMS = 20; + + private Configuration conf; + + private FileSystem fs; + + /** Returns the cached instance in the servlet context. + * @see NutchBeanConstructor*/ + public static NutchBean get(ServletContext app, Configuration conf) throws IOException { + NutchBean bean = (NutchBean)app.getAttribute(KEY); + return bean; + } + + + /** + * + * @param conf + * @throws IOException + */ + public NutchBean(Configuration conf) throws IOException { + this(conf, null); + } + + /** + * Construct in a named directory. + * @param conf + * @param dir + * @throws IOException + */ + public NutchBean(Configuration conf, Path dir) throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + if (dir == null) { + dir = new Path(this.conf.get("searcher.dir", "crawl")); + } + Path servers = new Path(dir, "search-servers.txt"); + if (fs.exists(servers)) { + if (LOG.isInfoEnabled()) { + LOG.info("searching servers in " + servers); + } + init(new DistributedSearch.Client(servers, conf)); + } else { + init(new Path(dir, "index"), new Path(dir, "indexes"), new Path( + dir, "segments"), new Path(dir, "linkdb")); + } + } + + private void init(Path indexDir, Path indexesDir, Path segmentsDir, + Path linkDb) + throws IOException { + IndexSearcher indexSearcher; + if (this.fs.exists(indexDir)) { + if (LOG.isInfoEnabled()) { + LOG.info("opening merged index in " + indexDir); + } + indexSearcher = new IndexSearcher(indexDir, this.conf); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("opening indexes in " + indexesDir); + } + + Vector vDirs=new Vector(); + FileStatus[] fstats = fs.listStatus(indexesDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path [] directories = HadoopFSUtil.getPaths(fstats); + for(int i = 0; i < directories.length; i++) { + Path indexdone = new Path(directories[i], Indexer.DONE_NAME); + if(fs.isFile(indexdone)) { + vDirs.add(directories[i]); + } + } + + + directories = new Path[ vDirs.size() ]; + for(int i = 0; vDirs.size()>0; i++) { + directories[i]=(Path)vDirs.remove(0); + } + + indexSearcher = new IndexSearcher(directories, this.conf); + } + + if (LOG.isInfoEnabled()) { + LOG.info("opening segments in " + segmentsDir); + } + FetchedSegments segments = new FetchedSegments(this.fs, segmentsDir.toString(),this.conf); + + this.segmentNames = segments.getSegmentNames(); + + this.searcher = indexSearcher; + this.detailer = indexSearcher; + this.summarizer = segments; + this.content = segments; + + if (LOG.isInfoEnabled()) { LOG.info("opening linkdb in " + linkDb); } + this.linkDb = new LinkDbInlinks(fs, linkDb, this.conf); + } + + private void init(DistributedSearch.Client client) { + this.segmentNames = client.getSegmentNames(); + this.searcher = client; + this.detailer = client; + this.summarizer = client; + this.content = client; + this.linkDb = client; + } + + + public String[] getSegmentNames() { + return segmentNames; + } + + public Hits search(Query query, int numHits) throws IOException { + return search(query, numHits, null, null, false); + } + + public Hits search(Query query, int numHits, + String dedupField, String sortField, boolean reverse) + throws IOException { + + return searcher.search(query, numHits, 0, dedupField, sortField, reverse); + } + + private class DupHits extends ArrayList { + private boolean maxSizeExceeded; + } + + /** Search for pages matching a query, eliminating excessive hits from the + * same site. Hits after the first <code>maxHitsPerDup</code> from the same + * site are removed from results. The remaining hits have {@link + * Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero then all + * hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, int maxHitsPerDup) + throws IOException { + return search(query, numHits, maxHitsPerDup, "site", null, false); + } + + /** Search for pages matching a query, eliminating excessive hits with + * matching values for a named field. Hits after the first + * <code>maxHitsPerDup</code> are removed from results. The remaining hits + * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero + * then all hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @param dedupField field name to check for duplicates + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, + int maxHitsPerDup, String dedupField) + throws IOException { + return search(query, numHits, maxHitsPerDup, dedupField, null, false); + } + /** Search for pages matching a query, eliminating excessive hits with + * matching values for a named field. Hits after the first + * <code>maxHitsPerDup</code> are removed from results. The remaining hits + * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero + * then all hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @param dedupField field name to check for duplicates + * @param sortField Field to sort on (or null if no sorting). + * @param reverse True if we are to reverse sort by <code>sortField</code>. + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, + int maxHitsPerDup, + String dedupField, + String sortField, boolean reverse) + throws IOException { + if (maxHitsPerDup <= 0) // disable dup checking + return search(query, numHits, dedupField, sortField, reverse); + + float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f); + int numHitsRaw = (int)(numHits * rawHitsFactor); + if (LOG.isInfoEnabled()) { + LOG.info("searching for "+numHitsRaw+" raw hits"); + } + Hits hits = searcher.search(query, numHitsRaw, maxHitsPerDup, dedupField, sortField, reverse); + long total = hits.getTotal(); + Map dupToHits = new HashMap(); + List resultList = new ArrayList(); + Set seen = new HashSet(); + List excludedValues = new ArrayList(); + boolean totalIsExact = true; + for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) { + // get the next raw hit + if (rawHitNum >= hits.getLength()) { + // optimize query by prohibiting more matches on some excluded values + Query optQuery = (Query)query.clone(); + for (int i = 0; i < excludedValues.size(); i++) { + if (i == MAX_PROHIBITED_TERMS) + break; + optQuery.addProhibitedTerm(((String)excludedValues.get(i)), + dedupField); + } + numHitsRaw = (int)(numHitsRaw * rawHitsFactor); + if (LOG.isInfoEnabled()) { + LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery); + } + hits = searcher.search(optQuery, numHitsRaw, maxHitsPerDup, dedupField, sortField, reverse); + if (LOG.isInfoEnabled()) { + LOG.info("found "+hits.getTotal()+" raw hits"); + } + rawHitNum = -1; + continue; + } + + Hit hit = hits.getHit(rawHitNum); + if (seen.contains(hit)) + continue; + seen.add(hit); + + // get dup hits for its value + String value = hit.getDedupValue(); + DupHits dupHits = (DupHits)dupToHits.get(value); + if (dupHits == null) + dupToHits.put(value, dupHits = new DupHits()); + + // does this hit exceed maxHitsPerDup? + if (dupHits.size() == maxHitsPerDup) { // yes -- ignore the hit + if (!dupHits.maxSizeExceeded) { + + // mark prior hits with moreFromDupExcluded + for (int i = 0; i < dupHits.size(); i++) { + ((Hit)dupHits.get(i)).setMoreFromDupExcluded(true); + } + dupHits.maxSizeExceeded = true; + + excludedValues.add(value); // exclude dup + } + totalIsExact = false; + } else { // no -- collect the hit + resultList.add(hit); + dupHits.add(hit); + + // are we done? + // we need to find one more than asked for, so that we can tell if + // there are more hits to be shown + if (resultList.size() > numHits) + break; + } + } + + Hits results = + new Hits(total, + (Hit[])resultList.toArray(new Hit[resultList.size()])); + results.setTotalIsExact(totalIsExact); + return results; + } + + + public String getExplanation(Query query, Hit hit) throws IOException { + return searcher.getExplanation(query, hit); + } + + public HitDetails getDetails(Hit hit) throws IOException { + return detailer.getDetails(hit); + } + + public HitDetails[] getDetails(Hit[] hits) throws IOException { + return detailer.getDetails(hits); + } + + public Summary getSummary(HitDetails hit, Query query) throws IOException { + return summarizer.getSummary(hit, query); + } + + public Summary[] getSummary(HitDetails[] hits, Query query) + throws IOException { + return summarizer.getSummary(hits, query); + } + + public byte[] getContent(HitDetails hit) throws IOException { + return content.getContent(hit); + } + + public ParseData getParseData(HitDetails hit) throws IOException { + return content.getParseData(hit); + } + + public ParseText getParseText(HitDetails hit) throws IOException { + return content.getParseText(hit); + } + + public String[] getAnchors(HitDetails hit) throws IOException { + return linkDb.getAnchors(hit); + } + + public Inlinks getInlinks(HitDetails hit) throws IOException { + return linkDb.getInlinks(hit); + } + + public long getFetchDate(HitDetails hit) throws IOException { + return content.getFetchDate(hit); + } + + public void close() throws IOException { + if (content != null) { content.close(); } + if (searcher != null) { searcher.close(); } + if (linkDb != null) { linkDb.close(); } + if (fs != null) { fs.close(); } + } + + /** For debugging. */ + public static void main(String[] args) throws Exception { + String usage = "NutchBean query"; + + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + + Configuration conf = NutchConfiguration.create(); + NutchBean bean = new NutchBean(conf); + Query query = Query.parse(args[0], conf); + Hits hits = bean.search(query, 10); + System.out.println("Total hits: " + hits.getTotal()); + int length = (int)Math.min(hits.getTotal(), 10); + Hit[] show = hits.getHits(0, length); + HitDetails[] details = bean.getDetails(show); + Summary[] summaries = bean.getSummary(details, query); + + for (int i = 0; i < hits.getLength(); i++) { + System.out.println(" "+i+" "+ details[i] + "\n" + summaries[i]); + } + } + + public long getProtocolVersion(String className, long arg1) throws IOException { + if(DistributedSearch.Protocol.class.getName().equals(className)){ + return 1; + } else { + throw new IOException("Unknown Protocol classname:" + className); + } + } + + /** Responsible for constructing a NutchBean singleton instance and + * caching it in the servlet context. This class should be registered in + * the deployment descriptor as a listener + */ + public static class NutchBeanConstructor implements ServletContextListener { + + public void contextDestroyed(ServletContextEvent sce) { } + + public void contextInitialized(ServletContextEvent sce) { + ServletContext app = sce.getServletContext(); + Configuration conf = NutchConfiguration.get(app); + + LOG.info("creating new bean"); + NutchBean bean = null; + try { + bean = new NutchBean(conf); + app.setAttribute(KEY, bean); + } + catch (IOException ex) { + LOG.error(StringUtils.stringifyException(ex)); + } + } + } + +} Added: tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/Searcher.java =================================================================== --- tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/Searcher.java (rev 0) +++ tags/nutchwax-0_12_9-JIRA-ARI-2260/archive/src/nutch/src/java/org/apache/nutch/searcher/Searcher.java 2010-03-24 01:08:53 UTC (rev 3005) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.IOException; + +import org.apache.hadoop.io.Closeable; + +/** Service that searches. */ +public interface Searcher extends Closeable { + /** Return the top-scoring hits for a query. */ + Hits search(Query query, int numHits, + int maxHitsPerDup, + String dedupField, + String sortField, boolean reverse) + throws IOException; + + /** Return an HTML-formatted explanation of how a query scored. */ + String getExplanation(Query query, Hit hit) throws IOException; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |