From: <bi...@us...> - 2009-10-28 22:11:10
|
Revision: 2865 http://archive-access.svn.sourceforge.net/archive-access/?rev=2865&view=rev Author: binzino Date: 2009-10-28 22:10:42 +0000 (Wed, 28 Oct 2009) Log Message: ----------- Initial revision. Copied from Nutch source, then modified to have NutchWAX extensions/edits which used to be in NutchWaxBean. Added Paths: ----------- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneSearchBean.java trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java Added: trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneSearchBean.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneSearchBean.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/LuceneSearchBean.java 2009-10-28 22:10:42 UTC (rev 2865) @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.ArchiveParallelReader; +import org.apache.lucene.index.MultiReader; + +import org.apache.nutch.indexer.FsDirectory; +import org.apache.nutch.indexer.Indexer; +import org.apache.nutch.util.HadoopFSUtil; + + +public class LuceneSearchBean implements RPCSearchBean { + + public static final long VERSION = 1L; + + private IndexSearcher searcher; + + private FileSystem fs; + + private Configuration conf; + + /** + * Construct in a named directory. + * @param conf + * @param dir + * @throws IOException + */ + public LuceneSearchBean(Configuration conf, Path pindexesDir, Path indexDir, Path indexesDir ) + throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + init( pindexesDir, indexDir, indexesDir ); + } + + private void init( Path pindexesDir, Path indexDir, Path indexesDir) + throws IOException { + + IndexReader reader = getIndexReader( pindexesDir ); + + if ( reader != null ) + { + this.searcher = new IndexSearcher( reader, this.conf ); + } + else + { + if (this.fs.exists(indexDir)) { + LOG.info("opening merged index in " + indexDir); + this.searcher = new IndexSearcher(indexDir, this.conf); + } else { + LOG.info("opening indexes in " + indexesDir); + + List<Path> vDirs = new ArrayList<Path>(); + FileStatus[] fstats = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] directories = HadoopFSUtil.getPaths(fstats); + for(int i = 0; i < directories.length; i++) { + Path indexdone = new Path(directories[i], Indexer.DONE_NAME); + if(fs.isFile(indexdone)) { + vDirs.add(directories[i]); + } + } + + directories = new Path[ vDirs.size() ]; + for(int i = 0; vDirs.size()>0; i++) { + directories[i] = vDirs.remove(0); + } + + this.searcher = new IndexSearcher(directories, this.conf); + } + } + } + + public Hits search(Query query, int numHits, String dedupField, + String sortField, boolean reverse) + throws IOException { + return searcher.search(query, numHits, dedupField, sortField, reverse); + } + + public String getExplanation(Query query, Hit hit) throws IOException { + return searcher.getExplanation(query, hit); + } + + public HitDetails getDetails(Hit hit) throws IOException { + return searcher.getDetails(hit); + } + + public HitDetails[] getDetails(Hit[] hits) throws IOException { + return searcher.getDetails(hits); + } + + public boolean ping() throws IOException { + return true; + } + + public void close() throws IOException { + if (searcher != null) { searcher.close(); } + if (fs != null) { fs.close(); } + } + + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return VERSION; + } + + + private IndexReader getIndexReader( Path pindexesDir ) + throws IOException + { + /* + FileSystem fs = FileSystem.get( conf ); + + Path dir = new Path( conf.get( "searcher.dir", "crawl") ).makeQualified( fs ); + LOG.info( "Looking for Nutch indexes in: " + dir ); + if ( ! fs.exists( dir ) ) + { + LOG.warn( "Directory does not exist: " + dir ); + LOG.warn( "No Nutch indexes will be found and all queries will return no results." ); + + return false; + } + + Path pindexesDir = new Path( dir, "pindexes" ).makeQualified(fs); + */ + + LOG.info( "Looking for NutchWax parallel indexes in: " + pindexesDir ); + if ( ! fs.exists( pindexesDir ) ) + { + LOG.warn( "Parallel indexes directory does not exist: " + pindexesDir ); + + return null; + } + + if ( ! fs.getFileStatus( pindexesDir ).isDir( ) ) + { + LOG.warn( "Parallel indexes directory is not a directory: " + pindexesDir ); + + return null; + } + + FileStatus[] fstats = fs.listStatus(pindexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] indexDirs = HadoopFSUtil.getPaths( fstats ); + + if ( indexDirs.length < 1 ) + { + LOG.info( "No sub-dirs found in parallel indexes directory: " + pindexesDir ); + + return null; + } + + List<IndexReader> readers = new ArrayList<IndexReader>( indexDirs.length ); + + for ( Path indexDir : indexDirs ) + { + fstats = fs.listStatus( indexDir, HadoopFSUtil.getPassDirectoriesFilter(fs) ); + Path parallelDirs[] = HadoopFSUtil.getPaths( fstats ); + + if ( parallelDirs.length < 1 ) + { + LOG.info( "No sub-directories, skipping: " + indexDir ); + + continue; + } + + ArchiveParallelReader reader = new ArchiveParallelReader( ); + + // Sort the parallelDirs so that we add them in order. Order + // matters to the ParallelReader. + Arrays.sort( parallelDirs ); + + for ( Path p : parallelDirs ) + { + LOG.info( "Adding reader for: " + p ); + reader.add( IndexReader.open( new FsDirectory( fs, p, false, conf ) ) ); + } + + readers.add( reader ); + } + + if ( readers.size( ) == 0 ) + { + LOG.warn( "No parallel indexes in: " + pindexesDir ); + + return null; + } + + MultiReader reader = new MultiReader( readers.toArray( new IndexReader[0] ) ); + + return reader; + } + +} Added: trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java (rev 0) +++ trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/NutchBean.java 2009-10-28 22:10:42 UTC (rev 2865) @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.searcher; + +import java.io.*; +import java.net.InetSocketAddress; +import java.util.*; + +import javax.servlet.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.parse.*; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.util.NutchConfiguration; + +/** + * One stop shopping for search-related functionality. + * @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $ + */ +public class NutchBean +implements SearchBean, SegmentBean, HitInlinks, Closeable { + + public static final Log LOG = LogFactory.getLog(NutchBean.class); + public static final String KEY = "nutchBean"; + +// static { +// LogFormatter.setShowThreadIDs(true); +// } + + private SearchBean searchBean; + private SegmentBean segmentBean; + private final HitInlinks linkDb; + + /** BooleanQuery won't permit more than 32 required/prohibited clauses. We + * don't want to use too many of those. */ + private static final int MAX_PROHIBITED_TERMS = 20; + + private final Configuration conf; + + private final FileSystem fs; + + /** Returns the cached instance in the servlet context. + * @see NutchBeanConstructor*/ + public static NutchBean get(ServletContext app, Configuration conf) throws IOException { + final NutchBean bean = (NutchBean)app.getAttribute(KEY); + return bean; + } + + + /** + * + * @param conf + * @throws IOException + */ + public NutchBean(Configuration conf) throws IOException { + this(conf, null); + } + + /** + * Construct in a named directory. + * + * @param conf + * @param dir + * @throws IOException + */ + public NutchBean(Configuration conf, Path dir) throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + if (dir == null) + { + dir = new Path( this.conf.get( "searcher.dir", "crawl" ) ).makeQualified( fs ); + } + + LOG.info( "Looking for Nutch indexes in: " + dir ); + if ( ! fs.exists( dir ) ) + { + LOG.error( "Directory does not exist: " + dir ); + LOG.error( "NutchBean not modified." ); + LOG.error( "No Nutch indexes will be found and all queries will return no results." ); + } + + final Path luceneConfig = new Path( dir, "search-servers.txt" ); + final Path solrConfig = new Path( dir, "solr-servers.txt" ); + final Path segmentConfig = new Path( dir, "segment-servers.txt" ); + + if (fs.exists(luceneConfig) || fs.exists(solrConfig)) { + searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig); + } else { + final Path pindexesDir = new Path( dir, "pindexes" ); + final Path indexDir = new Path( dir, "index" ); + final Path indexesDir = new Path( dir, "indexes" ); + searchBean = new LuceneSearchBean( conf, pindexesDir, indexDir, indexesDir ); + } + + if (fs.exists(segmentConfig)) { + segmentBean = new DistributedSegmentBean(conf, segmentConfig); + } else if (fs.exists(luceneConfig)) { + segmentBean = new DistributedSegmentBean(conf, luceneConfig); + } else { + segmentBean = new FetchedSegments(conf, new Path(dir, "segments")); + } + + linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf); + } + + public static List<InetSocketAddress> readAddresses(Path path, + Configuration conf) throws IOException { + final List<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>(); + for (final String line : readConfig(path, conf)) { + final StringTokenizer tokens = new StringTokenizer(line); + if (tokens.hasMoreTokens()) { + final String host = tokens.nextToken(); + if (tokens.hasMoreTokens()) { + final String port = tokens.nextToken(); + addrs.add(new InetSocketAddress(host, Integer.parseInt(port))); + } + } + } + return addrs; + } + + public static List<String> readConfig(Path path, Configuration conf) + throws IOException { + final FileSystem fs = FileSystem.get(conf); + final BufferedReader reader = + new BufferedReader(new InputStreamReader(fs.open(path))); + try { + final ArrayList<String> addrs = new ArrayList<String>(); + String line; + while ((line = reader.readLine()) != null) { + addrs.add(line); + } + return addrs; + } finally { + reader.close(); + } + } + + public String[] getSegmentNames() throws IOException { + return segmentBean.getSegmentNames(); + } + + public Hits search(Query query, int numHits) throws IOException { + return search(query, numHits, null, null, false); + } + + public Hits search(Query query, int numHits, + String dedupField, String sortField, boolean reverse) + throws IOException { + + return searchBean.search(query, numHits, dedupField, sortField, reverse); + } + + @SuppressWarnings("serial") + private class DupHits extends ArrayList<Hit> { + private boolean maxSizeExceeded; + } + + /** Search for pages matching a query, eliminating excessive hits from the + * same site. Hits after the first <code>maxHitsPerDup</code> from the same + * site are removed from results. The remaining hits have {@link + * Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero then all + * hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, int maxHitsPerDup) + throws IOException { + return search(query, numHits, maxHitsPerDup, "site", null, false); + } + + /** Search for pages matching a query, eliminating excessive hits with + * matching values for a named field. Hits after the first + * <code>maxHitsPerDup</code> are removed from results. The remaining hits + * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero + * then all hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @param dedupField field name to check for duplicates + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, + int maxHitsPerDup, String dedupField) + throws IOException { + return search(query, numHits, maxHitsPerDup, dedupField, null, false); + } + /** Search for pages matching a query, eliminating excessive hits with + * matching values for a named field. Hits after the first + * <code>maxHitsPerDup</code> are removed from results. The remaining hits + * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero + * then all hits are returned. + * + * @param query query + * @param numHits number of requested hits + * @param maxHitsPerDup the maximum hits returned with matching values, or zero + * @param dedupField field name to check for duplicates + * @param sortField Field to sort on (or null if no sorting). + * @param reverse True if we are to reverse sort by <code>sortField</code>. + * @return Hits the matching hits + * @throws IOException + */ + public Hits search(Query query, int numHits, + int maxHitsPerDup, String dedupField, + String sortField, boolean reverse) + throws IOException { + if (maxHitsPerDup <= 0) // disable dup checking + return search(query, numHits, dedupField, sortField, reverse); + + final float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f); + int numHitsRaw = (int)(numHits * rawHitsFactor); + if (LOG.isInfoEnabled()) { + LOG.info("searching for "+numHitsRaw+" raw hits"); + } + Hits hits = searchBean.search(query, numHitsRaw, + dedupField, sortField, reverse); + final long total = hits.getTotal(); + final Map<String, DupHits> dupToHits = new HashMap<String, DupHits>(); + final List<Hit> resultList = new ArrayList<Hit>(); + final Set<Hit> seen = new HashSet<Hit>(); + final List<String> excludedValues = new ArrayList<String>(); + boolean totalIsExact = true; + for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) { + // get the next raw hit + if (rawHitNum >= hits.getLength()) { + // optimize query by prohibiting more matches on some excluded values + final Query optQuery = (Query)query.clone(); + for (int i = 0; i < excludedValues.size(); i++) { + if (i == MAX_PROHIBITED_TERMS) + break; + optQuery.addProhibitedTerm(excludedValues.get(i), + dedupField); + } + numHitsRaw = (int)(numHitsRaw * rawHitsFactor); + if (LOG.isInfoEnabled()) { + LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery); + } + hits = searchBean.search(optQuery, numHitsRaw, + dedupField, sortField, reverse); + if (LOG.isInfoEnabled()) { + LOG.info("found "+hits.getTotal()+" raw hits"); + } + rawHitNum = -1; + continue; + } + + final Hit hit = hits.getHit(rawHitNum); + if (seen.contains(hit)) + continue; + seen.add(hit); + + // get dup hits for its value + final String value = hit.getDedupValue(); + DupHits dupHits = dupToHits.get(value); + if (dupHits == null) + dupToHits.put(value, dupHits = new DupHits()); + + // does this hit exceed maxHitsPerDup? + if (dupHits.size() == maxHitsPerDup) { // yes -- ignore the hit + if (!dupHits.maxSizeExceeded) { + + // mark prior hits with moreFromDupExcluded + for (int i = 0; i < dupHits.size(); i++) { + dupHits.get(i).setMoreFromDupExcluded(true); + } + dupHits.maxSizeExceeded = true; + + excludedValues.add(value); // exclude dup + } + totalIsExact = false; + } else { // no -- collect the hit + resultList.add(hit); + dupHits.add(hit); + + // are we done? + // we need to find one more than asked for, so that we can tell if + // there are more hits to be shown + if (resultList.size() > numHits) + break; + } + } + + final Hits results = + new Hits(total, + resultList.toArray(new Hit[resultList.size()])); + results.setTotalIsExact(totalIsExact); + return results; + } + + + public String getExplanation(Query query, Hit hit) throws IOException { + return searchBean.getExplanation(query, hit); + } + + public HitDetails getDetails(Hit hit) throws IOException { + return searchBean.getDetails(hit); + } + + public HitDetails[] getDetails(Hit[] hits) throws IOException { + return searchBean.getDetails(hits); + } + + public Summary getSummary(HitDetails hit, Query query) throws IOException { + return segmentBean.getSummary(hit, query); + } + + public Summary[] getSummary(HitDetails[] hits, Query query) + throws IOException { + return segmentBean.getSummary(hits, query); + } + + public byte[] getContent(HitDetails hit) throws IOException { + return segmentBean.getContent(hit); + } + + public ParseData getParseData(HitDetails hit) throws IOException { + return segmentBean.getParseData(hit); + } + + public ParseText getParseText(HitDetails hit) throws IOException { + return segmentBean.getParseText(hit); + } + + public String[] getAnchors(HitDetails hit) throws IOException { + return linkDb.getAnchors(hit); + } + + public Inlinks getInlinks(HitDetails hit) throws IOException { + return linkDb.getInlinks(hit); + } + + public long getFetchDate(HitDetails hit) throws IOException { + return segmentBean.getFetchDate(hit); + } + + public void close() throws IOException { + if (searchBean != null) { searchBean.close(); } + if (segmentBean != null) { segmentBean.close(); } + if (linkDb != null) { linkDb.close(); } + if (fs != null) { fs.close(); } + } + + public boolean ping() { + return true; + } + + /** For debugging. */ + public static void main(String[] args) throws Exception { + + String usage = "NutchWaxBean [options] query" + + "\n\t-h <n> Hits per site" + + "\n\t-n <n> Number of results to find" + + "\n\t-d <dir> Search directory" + + "\n"; + + if ( args.length == 0 ) + { + System.err.println( usage ); + System.exit( -1 ); + } + + String queryString = args[args.length - 1]; + String searchDir = null; + int hitsPerSite = 0; + int numHits = 10; + for ( int i = 0 ; i < args.length - 1 ; i++ ) + { + try + { + if ( "-h".equals( args[i] ) ) + { + i++; + hitsPerSite = Integer.parseInt( args[i] ); + } + if ( "-n".equals( args[i] ) ) + { + i++; + numHits = Integer.parseInt( args[i] ); + } + if ( "-d".equals( args[i] ) ) + { + i++; + searchDir = args[i]; + } + } + catch ( NumberFormatException nfe ) + { + System.err.println( "Error: not a numeric value: " + args[i] ); + System.err.println( usage ); + System.exit( -1 ); + } + } + + final Configuration conf = NutchConfiguration.create(); + + if ( searchDir != null ) + { + conf.set( "searcher.dir", searchDir ); + } + System.out.println( "Searching in directory: " + conf.get( "searcher.dir" ) ); + System.out.println( "Hits per site: " + hitsPerSite ); + + final NutchBean bean = new NutchBean(conf); + + try { + final Query query = Query.parse( queryString, conf); + final Hits hits = bean.search(query, 10); + System.out.println( "Total hits : " + hits.getTotal () ); + System.out.println( "Hits length: " + hits.getLength() ); + final int length = (int)Math.min(hits.getTotal(), 10); + final Hit[] show = hits.getHits(0, length); + final HitDetails[] details = bean.getDetails(show); + final Summary[] summaries = bean.getSummary(details, query); + + for (int i = 0; i < length; i++) + { + System.out.println( " " + + i + + " " + + java.util.Arrays.asList( details[i].getValues( "segment" ) ) + + " " + + java.util.Arrays.asList( details[i].getValues( "url" ) ) + + " " + + java.util.Arrays.asList( details[i].getValues( "digest" ) ) + + " " + + java.util.Arrays.asList( details[i].getValues( "date" ) ) + + " " + + java.util.Arrays.asList( details[i].getValues( "title" ) ) + + "\n" + + summaries[i] ); + } + } catch (Throwable t) { + LOG.error("Exception occured while executing search: " + t, t); + System.exit(1); + } + System.exit(0); + } + + public long getProtocolVersion(String className, long clientVersion) + throws IOException { + if(RPCSearchBean.class.getName().equals(className) && + searchBean instanceof RPCSearchBean) { + + final RPCSearchBean rpcBean = (RPCSearchBean)searchBean; + return rpcBean.getProtocolVersion(className, clientVersion); + } else if (RPCSegmentBean.class.getName().equals(className) && + segmentBean instanceof RPCSegmentBean) { + + final RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean; + return rpcBean.getProtocolVersion(className, clientVersion); + } else { + throw new IOException("Unknown Protocol classname:" + className); + } + } + + /** Responsible for constructing a NutchBean singleton instance and + * caching it in the servlet context. This class should be registered in + * the deployment descriptor as a listener + */ + public static class NutchBeanConstructor implements ServletContextListener { + + public void contextDestroyed(ServletContextEvent sce) { } + + public void contextInitialized(ServletContextEvent sce) { + final ServletContext app = sce.getServletContext(); + final Configuration conf = NutchConfiguration.get(app); + + LOG.info("creating new bean"); + NutchBean bean = null; + try { + bean = new NutchBean(conf); + app.setAttribute(KEY, bean); + } + catch (final IOException ex) { + LOG.error(StringUtils.stringifyException(ex)); + } + } + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |