From: <bi...@us...> - 2009-10-26 23:02:14
|
Revision: 2833 http://archive-access.svn.sourceforge.net/archive-access/?rev=2833&view=rev Author: binzino Date: 2009-10-26 23:01:57 +0000 (Mon, 26 Oct 2009) Log Message: ----------- Added NutchWAX version of Indexer that doesn't need/use crawldb nor linkdb. Also has a special check for non-null values of required metadata fields: segment, digest. Added Paths: ----------- tags/nutchwax-0_12_9/archive/src/java/org/archive/nutchwax/Indexer.java Added: tags/nutchwax-0_12_9/archive/src/java/org/archive/nutchwax/Indexer.java =================================================================== --- tags/nutchwax-0_12_9/archive/src/java/org/archive/nutchwax/Indexer.java (rev 0) +++ tags/nutchwax-0_12_9/archive/src/java/org/archive/nutchwax/Indexer.java 2009-10-26 23:01:57 UTC (rev 2833) @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.archive.nutchwax; + +import java.io.*; +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.nutch.parse.*; +import org.apache.nutch.analysis.*; + +import org.apache.nutch.indexer.IndexingFilters; +import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.NutchSimilarity; + +import org.apache.nutch.util.LogUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.crawl.NutchWritable; + +import org.apache.lucene.index.*; +import org.apache.lucene.document.*; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; + +/** Create indexes for segments. */ +public class Indexer extends Configured implements Tool, Reducer<Text, NutchWritable, Text, Writable>, Mapper<Text, Writable, Text, NutchWritable> { + + public static final String DONE_NAME = "index.done"; + + public static final Log LOG = LogFactory.getLog(Indexer.class); + + /** A utility class used to pass a lucene document from Indexer.reduce + * to Indexer.OutputFormat. + * Note: Despite its name, it can't properly wrap a lucene document - it + * doesn't know how to serialize/deserialize a lucene document. + */ + public static class LuceneDocumentWrapper implements Writable { + private Document doc; + + public LuceneDocumentWrapper(Document doc) { + this.doc = doc; + } + + public Document get() { + return doc; + } + + public void readFields(DataInput in) throws IOException { + // intentionally left blank + } + + public void write(DataOutput out) throws IOException { + // intentionally left blank + } + + } + + /** Unwrap Lucene Documents created by reduce and add them to an index. */ + public static class OutputFormat + extends org.apache.hadoop.mapred.FileOutputFormat<WritableComparable, LuceneDocumentWrapper> { + public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job, + String name, final Progressable progress) throws IOException { + final Path perm = new Path(FileOutputFormat.getOutputPath(job), name); + final Path temp = + job.getLocalPath("index/_"+Integer.toString(new Random().nextInt())); + + int maxTokens = job.getInt("indexer.max.tokens", 10000); + if (maxTokens < 0) maxTokens = Integer.MAX_VALUE; + + fs.delete(perm, true); // delete old, if any + + final AnalyzerFactory factory = new AnalyzerFactory(job); + final IndexWriter writer = // build locally first + new IndexWriter(fs.startLocalOutput(perm, temp).toString(), + new NutchDocumentAnalyzer(job), true); + + writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10)); + writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100)); + writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE)); + writer.setTermIndexInterval + (job.getInt("indexer.termIndexInterval", 128)); + writer.setMaxFieldLength(maxTokens); + writer.setInfoStream(LogUtil.getInfoStream(LOG)); + writer.setUseCompoundFile(false); + writer.setSimilarity(new NutchSimilarity()); + + return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() { + boolean closed; + + public void write(WritableComparable key, LuceneDocumentWrapper value) + throws IOException { // unwrap & index doc + Document doc = value.get(); + NutchAnalyzer analyzer = factory.get(doc.get("lang")); + if (LOG.isInfoEnabled()) { + LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" + + " with analyzer " + analyzer + + " (" + doc.get("lang") + ")"); + } + writer.addDocument(doc, analyzer); + progress.progress(); + } + + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { continue; } + catch (Throwable e) { return; } + } + } + }; + + try { + prog.start(); + if (LOG.isInfoEnabled()) { LOG.info("Optimizing index."); } + // optimize & close index + writer.optimize(); + writer.close(); + fs.completeLocalOutput(perm, temp); // copy to dfs + fs.createNewFile(new Path(perm, DONE_NAME)); + } finally { + closed = true; + } + } + }; + } + } + + private IndexingFilters filters; + + public Indexer() { + + } + + public Indexer(Configuration conf) { + setConf(conf); + } + + public void configure(JobConf job) { + setConf(job); + this.filters = new IndexingFilters(getConf()); + } + + public void close() {} + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, Writable> output, Reporter reporter) + throws IOException { + ParseData parseData = null; + ParseText parseText = null; + while (values.hasNext()) { + Writable value = values.next().get(); // unwrap + + if (value instanceof ParseData) { + parseData = (ParseData)value; + } else if (value instanceof ParseText) { + parseText = (ParseText)value; + } else if (LOG.isWarnEnabled()) { + LOG.warn("Unrecognized type: "+value.getClass()); + } + } + + if ( parseText == null || parseData == null) { + return; // only have inlinks + } + + Document doc = new Document(); + Metadata metadata = parseData.getContentMeta(); + + if ( metadata.get(Nutch.SEGMENT_NAME_KEY) == null || + metadata.get(Nutch.SIGNATURE_KEY) == null ) + { + LOG.warn( "Skipping document, insufficient metadata: key=" + key + " metadata=" + metadata ); + return ; + } + + // add segment, used to map from merged index back to segment files + doc.add(new Field("segment", metadata.get(Nutch.SEGMENT_NAME_KEY), + Field.Store.YES, Field.Index.NO)); + + // add digest, used by dedup + doc.add(new Field("digest", metadata.get(Nutch.SIGNATURE_KEY), + Field.Store.YES, Field.Index.NO)); + + Parse parse = new ParseImpl(parseText, parseData); + try { + doc = this.filters.filter(doc, parse, key, /*fetchDatum*/ null, /*inlinks*/ null); + } catch (IndexingException e) { + if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); } + return; + } + + // skip documents discarded by indexing filters + if (doc == null) return; + + output.collect(key, new LuceneDocumentWrapper(doc)); + } + + public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments) + throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("Indexer: starting"); + } + + JobConf job = new NutchJob(getConf()); + job.setJobName("index " + indexDir); + + for (int i = 0; i < segments.length; i++) { + if (LOG.isInfoEnabled()) { + LOG.info("Indexer: adding segment: " + segments[i]); + } + FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segments[i], ParseText.DIR_NAME)); + } + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(Indexer.class); + job.setReducerClass(Indexer.class); + + FileOutputFormat.setOutputPath(job, indexDir); + job.setOutputFormat(OutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); + + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + + if (args.length < 2) { + System.err.println("Usage: <index> <segment> ..."); + return -1; + } + + Path[] segments = new Path[args.length-1]; + for (int i = 1; i < args.length; i++) { + segments[i-1] = new Path(args[i]); + } + + try { + index(new Path(args[0]), null, null, segments); + return 0; + } catch (Exception e) { + LOG.fatal("Indexer: " + StringUtils.stringifyException(e)); + return -1; + } + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException { + output.collect(key, new NutchWritable(value)); + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |