From: <mi...@us...> - 2008-07-26 15:33:40
|
Revision: 2499 http://archive-access.svn.sourceforge.net/archive-access/?rev=2499&view=rev Author: miklosh Date: 2008-07-26 15:33:48 +0000 (Sat, 26 Jul 2008) Log Message: ----------- Added DocIndexer for correctly indexing image digest information. Added Paths: ----------- trunk/archive-access/projects/nutchwax/imagesearch/src/java/org/archive/nutchwax/imagesearch/DocIndexer.java Added: trunk/archive-access/projects/nutchwax/imagesearch/src/java/org/archive/nutchwax/imagesearch/DocIndexer.java =================================================================== --- trunk/archive-access/projects/nutchwax/imagesearch/src/java/org/archive/nutchwax/imagesearch/DocIndexer.java (rev 0) +++ trunk/archive-access/projects/nutchwax/imagesearch/src/java/org/archive/nutchwax/imagesearch/DocIndexer.java 2008-07-26 15:33:48 UTC (rev 2499) @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.archive.nutchwax.imagesearch; + +import java.io.*; +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.nutch.parse.*; +import org.apache.nutch.analysis.*; + +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.LogUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.crawl.NutchWritable; + +import org.apache.lucene.index.*; +import org.apache.lucene.document.*; +import org.apache.nutch.indexer.IndexingException; +import org.apache.nutch.indexer.IndexingFilters; +import org.apache.nutch.indexer.NutchSimilarity; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.protocol.Content; + +/** Create indexes for segments suitable for image search. */ +public class DocIndexer extends Configured implements Tool, + Reducer<Text, NutchWritable, Text, Writable>, + Mapper<Text, Writable, Text, NutchWritable> { + + public static final String DONE_NAME = "index.done"; + public static final Log LOG = LogFactory.getLog(DocIndexer.class); + + /** A utility class used to pass a lucene document from Indexer.reduce + * to Indexer.OutputFormat. + * Note: Despite its name, it can't properly wrap a lucene document - it + * doesn't know how to serialize/deserialize a lucene document. + */ + private static class LuceneDocumentWrapper implements Writable { + + private Document doc; + + public LuceneDocumentWrapper(Document doc) { + this.doc = doc; + } + + public Document get() { + return doc; + } + + public void readFields(DataInput in) throws IOException { + // intentionally left blank + } + + public void write(DataOutput out) throws IOException { + // intentionally left blank + } + } + + /** Unwrap Lucene Documents created by reduce and add them to an index. */ + public static class OutputFormat + extends org.apache.hadoop.mapred.OutputFormatBase<WritableComparable, LuceneDocumentWrapper> { + + public RecordWriter<WritableComparable, LuceneDocumentWrapper> getRecordWriter(final FileSystem fs, JobConf job, + String name, final Progressable progress) throws IOException { + final Path perm = new Path(job.getOutputPath(), name); + final Path temp = + job.getLocalPath("index/_" + Integer.toString(new Random().nextInt())); + + fs.delete(perm); // delete old, if any + + final AnalyzerFactory factory = new AnalyzerFactory(job); + final IndexWriter writer = // build locally first + new IndexWriter(fs.startLocalOutput(perm, temp).toString(), + new NutchDocumentAnalyzer(job), true); + + writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10)); + writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100)); + writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE)); + writer.setTermIndexInterval(job.getInt("indexer.termIndexInterval", 128)); + writer.setMaxFieldLength(job.getInt("indexer.max.tokens", 10000)); + writer.setInfoStream(LogUtil.getInfoStream(LOG)); + writer.setUseCompoundFile(false); + writer.setSimilarity(new NutchSimilarity()); + + return new RecordWriter<WritableComparable, LuceneDocumentWrapper>() { + + boolean closed; + + public void write(WritableComparable key, LuceneDocumentWrapper value) + throws IOException { // unwrap & index doc + Document doc = value.get(); + NutchAnalyzer analyzer = factory.get(doc.get("lang")); + if (LOG.isInfoEnabled()) { + LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" + + " with analyzer " + analyzer + + " (" + doc.get("lang") + ")"); + } + writer.addDocument(doc, analyzer); + progress.progress(); + } + + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } catch (Throwable e) { + return; + } + } + } + }; + + try { + prog.start(); + if (LOG.isInfoEnabled()) { + LOG.info("Optimizing index."); + } + // optimize & close index + writer.optimize(); + writer.close(); + fs.completeLocalOutput(perm, temp); // copy to dfs + fs.createNewFile(new Path(perm, DONE_NAME)); + } finally { + closed = true; + } + } + }; + } + } + private IndexingFilters filters; + private ScoringFilters scfilters; + + public DocIndexer() { + + } + + public DocIndexer(Configuration conf) { + setConf(conf); + } + + public void configure(JobConf job) { + setConf(job); + this.filters = new IndexingFilters(getConf()); + this.scfilters = new ScoringFilters(getConf()); + } + + public void close() { + } + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, Writable> output, Reporter reporter) + throws IOException { + Inlinks inlinks = null; + CrawlDatum dbDatum = null; + CrawlDatum fetchDatum = null; + ParseData parseData = null; + ParseText parseText = null; + Metadata imageUrlMapping = new Metadata(); + while (values.hasNext()) { + Writable value = values.next().get(); // unwrap + if (value instanceof Inlinks) { + inlinks = (Inlinks) value; + } else if (value instanceof CrawlDatum) { + CrawlDatum datum = (CrawlDatum) value; + if (CrawlDatum.hasDbStatus(datum)) { + dbDatum = datum; + } else if (CrawlDatum.hasFetchStatus(datum)) { + // don't index unmodified (empty) pages + if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + fetchDatum = datum; + } + } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() || + CrawlDatum.STATUS_SIGNATURE == datum.getStatus()) { + continue; + } else { + throw new RuntimeException("Unexpected status: " + datum.getStatus()); + } + } else if (value instanceof ParseData) { + parseData = (ParseData) value; + } else if (value instanceof ParseText) { + parseText = (ParseText) value; + } else if (value instanceof Metadata) { + // Add image URL->digest mapping + Metadata mapping = (Metadata) value; + String[] imageUrls = mapping.names(); + for (String imageUrl : imageUrls) { + if (imageUrlMapping.get(imageUrl) == null) { + imageUrlMapping.add(imageUrl, mapping.get(imageUrl)); + } + } + } else if (LOG.isWarnEnabled()) { + LOG.warn("Unrecognized type: " + value.getClass()); + } + } + + if (fetchDatum == null || dbDatum == null || parseText == null || parseData == null) { + return; // only have inlinks + } + + if (!parseData.getStatus().isSuccess() || + fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) { + return; + } + + Document doc = new Document(); + Metadata metadata = parseData.getContentMeta(); + + // replace image ids with proper ones + if (imageUrlMapping.size() > 0) { + Metadata parseMeta = parseData.getParseMeta(); + parseMeta.remove(ImageSearch.IMAGE_IDS_KEY); + String[] imageUrls = parseMeta.getValues(ImageSearch.IMAGE_URLS_KEY); + for (String imageUrl : imageUrls) { + String mappedTo = imageUrlMapping.get(imageUrl); + if (mappedTo == null) { + if (LOG.isInfoEnabled()) { + LOG.info("No digest information for " + imageUrl); + } + parseMeta.add(ImageSearch.IMAGE_IDS_KEY, "-"); + continue; + } + parseMeta.add(ImageSearch.IMAGE_IDS_KEY, mappedTo); + } + } + + // add segment, used to map from merged index back to segment files + doc.add(new Field("segment", metadata.get(Nutch.SEGMENT_NAME_KEY), + Field.Store.YES, Field.Index.NO)); + + // add digest, used by dedup + doc.add(new Field("digest", metadata.get(Nutch.SIGNATURE_KEY), + Field.Store.YES, Field.Index.NO)); + + Parse parse = new ParseImpl(parseText, parseData); + try { + // extract information from dbDatum and pass it to + // fetchDatum so that indexing filters can use it + Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); + if (url != null) { + fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url); + } + // run indexing filters + doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks); + } catch (IndexingException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error indexing " + key + ": " + e); + } + return; + } + + // skip documents discarded by indexing filters + if (doc == null) { + return; + } + + float boost = 1.0f; + // run scoring filters + try { + boost = this.scfilters.indexerScore((Text) key, doc, dbDatum, + fetchDatum, parse, inlinks, boost); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error calculating score " + key + ": " + e); + } + return; + } + // apply boost to all indexed fields. + doc.setBoost(boost); + // store boost for use by explain and dedup + doc.add(new Field("boost", Float.toString(boost), + Field.Store.YES, Field.Index.NO)); + + output.collect(key, new LuceneDocumentWrapper(doc)); + } + + /** + * Emits image URLs as keys and their URL+digest as values. + */ + public static class ImageUrlEmitter + implements Mapper<Text, Writable, Text, Text>, + Reducer<Text, Text, Text, Metadata> { + + public void map(Text key, Writable value, + OutputCollector<Text, Text> output, Reporter reporter) + throws IOException { + + if (value instanceof ParseData) { + ParseData parseData = (ParseData) value; + Metadata parseMeta = parseData.getParseMeta(); + String[] imageUrls = parseMeta.getValues(ImageSearch.IMAGE_URLS_KEY); + if (imageUrls.length > 0) { + for (String url : imageUrls) { + output.collect(new Text(url), key); + } + } + } else if (value instanceof Content) { + Content content = (Content) value; + if (content.getContentType().contains("image/")) { + Metadata meta = content.getMetadata(); + // Using NutchWax.DIGEST_KEY here + String digest = meta.get("digest"); + if (digest == null) { + digest = meta.get(Metadata.SIGNATURE_KEY); + } + output.collect(new Text(content.getUrl()), new Text(digest)); + } + } + } + + public void reduce(Text key, Iterator<Text> values, + OutputCollector<Text, Metadata> output, Reporter reporter) + throws IOException { + + Vector<Text> parents = new Vector<Text>(); + String imageUrl = key.toString(); + String imageDigest = null; + while (values.hasNext()) { + Text data = values.next(); + String value = data.toString(); + // Determine type of value + if (value.contains("/")) { + // This value is a parent's key + parents.add(data); + } else { + // This value is a digest + imageDigest = value.toString(); + } + } + if (imageDigest != null) { + Metadata meta = new Metadata(); + meta.add(imageUrl, imageDigest); + Iterator<Text> it = parents.iterator(); + while (it.hasNext()) { + Text parentKey = it.next(); + output.collect(parentKey, meta); + } + } + } + + public void configure(JobConf job) {} + public void close() {} + } + + public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments) + throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("DocIndexer: starting"); + LOG.info("DocIndexer: linkdb: " + linkDb); + } + + /* + * First phase: determining image keys + */ + Path outDir = new Path("imgkeys-"+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + JobConf job = new NutchJob(getConf()); + job.setJobName("imagekeys " + indexDir); + for (int i = 0; i < segments.length; i++) { + job.addInputPath(new Path(segments[i], ParseData.DIR_NAME)); + job.addInputPath(new Path(segments[i], Content.DIR_NAME)); + } + + job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(ImageUrlEmitter.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(ImageUrlEmitter.class); + + job.setOutputPath(outDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Metadata.class); + + JobClient.runJob(job); + + /* + * Second phase: creating Lucene index + */ + job = new NutchJob(getConf()); + job.setJobName("index " + indexDir); + + for (int i = 0; i < segments.length; i++) { + if (LOG.isInfoEnabled()) { + LOG.info("DocIndexer: adding segment: " + segments[i]); + } + job.addInputPath(new Path(segments[i], CrawlDatum.FETCH_DIR_NAME)); + job.addInputPath(new Path(segments[i], CrawlDatum.PARSE_DIR_NAME)); + job.addInputPath(new Path(segments[i], ParseData.DIR_NAME)); + job.addInputPath(new Path(segments[i], ParseText.DIR_NAME)); + } + + job.addInputPath(outDir); + job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME)); + job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(DocIndexer.class); + job.setReducerClass(DocIndexer.class); + + job.setOutputPath(indexDir); + job.setOutputFormat(OutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); + + JobClient.runJob(job); + + FileSystem fs = FileSystem.get(getConf()); + fs.delete(outDir); + + if (LOG.isInfoEnabled()) { + LOG.info("DocIndexer: done"); + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new DocIndexer(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + + if (args.length < 4) { + System.err.println("Usage: <index> <crawldb> <linkdb> <segment> ..."); + return -1; + } + + Path[] segments = new Path[args.length - 3]; + for (int i = 3; i < args.length; i++) { + segments[i - 3] = new Path(args[i]); + } + + try { + index(new Path(args[0]), new Path(args[1]), new Path(args[2]), + segments); + return 0; + } catch (Exception e) { + LOG.fatal("DocIndexer: " + StringUtils.stringifyException(e)); + return -1; + } + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException { + output.collect(key, new NutchWritable(value)); + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |