From: <bi...@us...> - 2009-10-28 23:07:12
|
Revision: 2866 http://archive-access.svn.sourceforge.net/archive-access/?rev=2866&view=rev Author: binzino Date: 2009-10-28 23:07:02 +0000 (Wed, 28 Oct 2009) Log Message: ----------- Initial patch with NutchWAX modifications. Needs much more testing, especially with perCollectionSegments and in master/slave mode. Modified Paths: -------------- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java Modified: trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java =================================================================== --- trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java 2009-10-28 22:10:42 UTC (rev 2865) +++ trunk/archive-access/projects/nutchwax/archive/src/nutch/src/java/org/apache/nutch/searcher/FetchedSegments.java 2009-10-28 23:07:02 UTC (rev 2866) @@ -23,14 +23,20 @@ import java.io.InputStreamReader; import java.io.BufferedReader; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.nutch.protocol.*; @@ -43,22 +49,91 @@ /** Implements {@link HitSummarizer} and {@link HitContent} for a set of * fetched segments. */ -public class FetchedSegments implements HitSummarizer, HitContent -{ +public class FetchedSegments implements RPCSegmentBean { + public static final Log LOG = LogFactory.getLog(FetchedSegments.class); - private static class Segment implements Closeable { - - private static final Partitioner PARTITIONER = new HashPartitioner(); + public static final long VERSION = 1L; - private FileSystem fs; - private Path segmentDir; + private static final ExecutorService executor = + Executors.newCachedThreadPool(); + private class SummaryTask implements Callable<Summary> { + private final HitDetails details; + private final Query query; + + public SummaryTask(HitDetails details, Query query) { + this.details = details; + this.query = query; + } + + public Summary call() throws Exception { + return getSummary(details, query); + } + } + + /* + private class SegmentUpdater extends Thread { + + @Override + public void run() { + while (true) { + try { + final FileStatus[] fstats = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats); + final Iterator<Map.Entry<String, Segment>> i = segments.entrySet().iterator(); + while (i.hasNext()) { + final Map.Entry<String, Segment> entry = i.next(); + final Segment seg = entry.getValue(); + if (!fs.exists(seg.segmentDir)) { + try { + seg.close(); + } catch (final Exception e) { + / * A segment may fail to close + * since it may already be deleted from + * file system. So we just ignore the + * exception and remove the mapping from + * 'segments'. + * / + } finally { + i.remove(); + } + } + } + + if (segmentDirs != null) { + for (final Path segmentDir : segmentDirs) { + segments.putIfAbsent(segmentDir.getName(), + new Segment(fs, segmentDir, conf)); + } + } + + Thread.sleep(60000); + } catch (final InterruptedException e) { + // ignore + } catch (final IOException e) { + // ignore + } + } + } + + } + */ + + private static class Segment implements java.io.Closeable { + + private static final Partitioner<Text, Writable> PARTITIONER = + new HashPartitioner<Text, Writable>(); + + private final FileSystem fs; + private final Path segmentDir; + private MapFile.Reader[] content; private MapFile.Reader[] parseText; private MapFile.Reader[] parseData; private MapFile.Reader[] crawl; - private Configuration conf; + private final Configuration conf; public Segment(FileSystem fs, Path segmentDir, Configuration conf) throws IOException { this.fs = fs; @@ -73,7 +148,7 @@ } return (CrawlDatum)getEntry(crawl, url, new CrawlDatum()); } - + public byte[] getContent(Text url) throws IOException { synchronized (this) { if (content == null) @@ -97,7 +172,7 @@ } return (ParseText)getEntry(parseText, url, new ParseText()); } - + private MapFile.Reader[] getReaders(String subDir) throws IOException { return MapFileOutputFormat.getReaders(fs, new Path(segmentDir, subDir), this.conf); } @@ -122,16 +197,27 @@ } - private HashMap segments = new HashMap( ); - private boolean perCollection = false; - private Summarizer summarizer; + //private final ConcurrentMap<String, Segment> segments = new ConcurrentHashMap<String, Segment>(); + private final ConcurrentMap segments = new ConcurrentHashMap(); + private boolean perCollection = false; + private final FileSystem fs; + private final Configuration conf; + private final Path segmentsDir; + //private final SegmentUpdater segUpdater; + private final Summarizer summarizer; /** Construct given a directory containing fetcher output. */ - public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException - { - this.summarizer = new SummarizerFactory(conf).getSummarizer(); + public FetchedSegments(Configuration conf, Path segmentsDir) + throws IOException { + this.conf = conf; + this.fs = FileSystem.get(this.conf); + final FileStatus[] fstats = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats); + this.summarizer = new SummarizerFactory(this.conf).getSummarizer(); + this.segmentsDir = segmentsDir; + //this.segUpdater = new SegmentUpdater(); - Path[] segmentDirs = HadoopFSUtil.getPaths( fs.listStatus(new Path(segmentsDir), HadoopFSUtil.getPassDirectoriesFilter(fs)) ); if ( segmentDirs == null ) { LOG.warn( "No segment directories: " + segmentsDir ); @@ -161,7 +247,7 @@ Map perCollectionSegments = (Map) this.segments.get( collectionDir.getName( ) ); if ( perCollectionSegments == null ) { - perCollectionSegments = new HashMap( ); + perCollectionSegments = new ConcurrentHashMap( ); this.segments.put( collectionDir.getName( ), perCollectionSegments ); } @@ -179,18 +265,11 @@ else { Path segmentDir = segmentDirs[i]; - segments.put(segmentDir.getName(), new Segment(fs, segmentDir, conf)); + segments.put(segmentDir.getName(), new Segment(this.fs, segmentDir, this.conf)); } } - - // If we not-doing perCollection segments, process a single - // "remap" file for the "segments" dir. - if ( ! this.perCollection ) - { - addRemaps( fs, new Path(segmentsDir), (Map<String,Segment>) segments ); - } - - LOG.info( "segments: " + segments ); + + // this.segUpdater.start(); } protected void addRemaps( FileSystem fs, Path segmentDir, Map<String,Segment> segments ) @@ -235,100 +314,73 @@ } } - public String[] getSegmentNames() { return (String[])segments.keySet().toArray(new String[segments.size()]); } public byte[] getContent(HitDetails details) throws IOException { - return getSegment(details).getContent(getUrl(details)); + return getSegment(details).getContent(getKey(details)); } public ParseData getParseData(HitDetails details) throws IOException { - return getSegment(details).getParseData(getUrl(details)); + return getSegment(details).getParseData(getKey(details)); } public long getFetchDate(HitDetails details) throws IOException { - return getSegment(details).getCrawlDatum(getUrl(details)) + return getSegment(details).getCrawlDatum(getKey(details)) .getFetchTime(); } public ParseText getParseText(HitDetails details) throws IOException { - return getSegment(details).getParseText(getUrl(details)); + return getSegment(details).getParseText(getKey(details)); } public Summary getSummary(HitDetails details, Query query) throws IOException { - + if (this.summarizer == null) { return new Summary(); } - - String text = ""; - Segment segment = getSegment(details); - if ( segment != null ) - { - try - { - ParseText parseText = segment.getParseText(getUrl(details)); - text = (parseText != null) ? parseText.getText() : ""; - } - catch ( Exception e ) - { - LOG.error( "segment = " + segment.segmentDir, e ); - } - } - else - { - LOG.warn( "No segment for: " + details ); - } + final Segment segment = getSegment(details); + final ParseText parseText = segment.getParseText(getKey(details)); + final String text = (parseText != null) ? parseText.getText() : ""; return this.summarizer.getSummary(text, query); } - - private class SummaryThread extends Thread { - private HitDetails details; - private Query query; - private Summary summary; - private Throwable throwable; + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return VERSION; + } - public SummaryThread(HitDetails details, Query query) { - this.details = details; - this.query = query; + public Summary[] getSummary(HitDetails[] details, Query query) + throws IOException { + final List<Callable<Summary>> tasks = + new ArrayList<Callable<Summary>>(details.length); + for (int i = 0; i < details.length; i++) { + tasks.add(new SummaryTask(details[i], query)); } - public void run() { - try { - this.summary = getSummary(details, query); - } catch (Throwable throwable) { - this.throwable = throwable; - } + List<Future<Summary>> summaries; + try { + summaries = executor.invokeAll(tasks); + } catch (final InterruptedException e) { + throw new RuntimeException(e); } - } - - public Summary[] getSummary(HitDetails[] details, Query query) - throws IOException { - SummaryThread[] threads = new SummaryThread[details.length]; - for (int i = 0; i < threads.length; i++) { - threads[i] = new SummaryThread(details[i], query); - threads[i].start(); - } - - Summary[] results = new Summary[details.length]; - for (int i = 0; i < threads.length; i++) { + final Summary[] results = new Summary[details.length]; + for (int i = 0; i < details.length; i++) { + final Future<Summary> f = summaries.get(i); + Summary summary; try { - threads[i].join(); - } catch (InterruptedException e) { + summary = f.get(); + } catch (final Exception e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } throw new RuntimeException(e); } - if (threads[i].throwable instanceof IOException) { - throw (IOException)threads[i].throwable; - } else if (threads[i].throwable != null) { - throw new RuntimeException(threads[i].throwable); - } - results[i] = threads[i].summary; + results[i] = summary; } return results; } @@ -380,19 +432,18 @@ } } - private Text getUrl(HitDetails details) { - String url = details.getValue("orig"); - if (StringUtils.isBlank(url)) { - url = details.getValue("url"); - } + private Text getKey(HitDetails details) + { + String url = details.getValue("url") + " " + details.getValue("digest"); + return new Text(url); } public void close() throws IOException { - Iterator iterator = segments.values().iterator(); + final Iterator<Segment> iterator = segments.values().iterator(); while (iterator.hasNext()) { - ((Segment) iterator.next()).close(); + iterator.next().close(); } } - + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |