From: Doug C. <cu...@us...> - 2005-09-01 18:45:38
|
Update of /cvsroot/archive-access/archive-access/projects/nutch/src/java/org/archive/access/nutch In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv24577/src/java/org/archive/access/nutch Added Files: Tag: mapred ImportArcs.java IndexArcs.java Removed Files: Tag: mapred Arc2Segment.java Log Message: Add indexArcs command. --- NEW FILE: ImportArcs.java --- /* * $Id: ImportArcs.java,v 1.1.2.1 2005/09/01 18:45:29 cutting Exp $ * * Copyright (C) 2003 Internet Archive. * * This file is part of the archive-access tools project * (http://sourceforge.net/projects/archive-access). * * The archive-access tools are free software; you can redistribute them and/or * modify them under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or any * later version. * * The archive-access tools are distributed in the hope that they will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser * Public License for more details. * * You should have received a copy of the GNU Lesser Public License along with * the archive-access tools; if not, write to the Free Software Foundation, * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ package org.archive.access.nutch; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger; import java.net.URI; import org.apache.commons.httpclient.Header; import org.apache.nutch.io.Writable; import org.apache.nutch.io.WritableComparable; import org.apache.nutch.io.UTF8; import org.apache.nutch.io.MD5Hash; import org.apache.nutch.protocol.Content; import org.apache.nutch.util.NutchConf; import org.apache.nutch.util.NutchConfigured; import org.apache.nutch.util.mime.MimeType; import org.apache.nutch.util.mime.MimeTypes; import org.apache.nutch.mapred.JobConf; import org.apache.nutch.mapred.JobClient; import org.apache.nutch.mapred.Mapper; import org.apache.nutch.mapred.OutputCollector; import org.apache.nutch.mapred.Reporter; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Fetcher; import org.apache.nutch.crawl.FetcherOutput; import org.apache.nutch.crawl.FetcherOutputFormat; import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseStatus; import org.apache.nutch.parse.Parser; import org.apache.nutch.parse.ParserFactory; import org.apache.nutch.parse.ParseImpl; import org.archive.io.arc.ARCReader; import org.archive.io.arc.ARCReaderFactory; import org.archive.io.arc.ARCRecord; import org.archive.io.arc.ARCRecordMetaData; import org.archive.util.ArchiveUtils; import org.archive.util.TextUtils; public class ImportArcs extends NutchConfigured implements Mapper { private static final Logger LOG = Logger.getLogger(ImportArcs.class.getName()); private static final String WHITESPACE = "\\s+"; public static final String ARCFILENAME_KEY = "arcname"; public static final String ARCFILEOFFSET_KEY = "arcoffset"; public static final String ARCCOLLECTION_KEY = "collection"; private static final String CONTENT_TYPE_KEY = "content-type"; private static final String TEXT_TYPE = "text/"; private static final String APPLICATION_TYPE = "application/"; private boolean indexAll; private int contentLimit; private MimeTypes mimeTypes; private String collectionName; private String segmentName; public ImportArcs() { super(null); } public ImportArcs(NutchConf conf) { super(conf); } public void configure(JobConf job) { setConf(job); this.indexAll = job.getBoolean("archive.index.all", false); this.contentLimit = job.getInt("http.content.limit", 100000); this.mimeTypes = MimeTypes.get(job.get("mime.types.file")); this.collectionName = job.get("archive.collection", "web"); this.segmentName = job.get(Fetcher.SEGMENT_NAME_KEY); if (job.getBoolean("arc2segment.verbose", false)) { LOG.setLevel(Level.FINE); } System.setProperty("java.protocol.handler.pkgs", "org.archive.net"); } public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { String arcLocation = ((UTF8)value).toString(); LOG.info("opening "+arcLocation); ARCReader arc = null; String arcName = null; try { arc = ARCReaderFactory.get(arcLocation); } catch (Throwable e) { LOG.log(Level.WARNING, "Error opening: " + arcLocation, e); return; } // Don't run the digester. Digest is unused and it costs CPU. arc.setDigest(false); try { for (Iterator i = arc.iterator(); i.hasNext();) { ARCRecord rec = (ARCRecord) i.next(); if (arcName == null) { // first entry has arc name String arcPath = new URI(rec.getMetaData().getUrl()).getPath(); arcName = new File(arcPath).getName(); if (arcName.endsWith(".arc")) { arcName = arcName.substring(0, arcName.indexOf(".arc")); } reporter.setStatus(arcName); } if (rec.getStatusCode() != 200) continue; try { processRecord(arcName, rec, output); } catch (Throwable e) { LOG.log(Level.WARNING, "Error processing: " + arcLocation, e); } } } catch (Throwable e) { // problem parsing arc file LOG.log(Level.WARNING, "Error parsing: " + arcLocation, e); } } private void processRecord(final String arcName, final ARCRecord rec, OutputCollector output) throws IOException { ARCRecordMetaData arcData = rec.getMetaData(); String url = arcData.getUrl(); String mimetype = arcData.getMimetype(); if (mimetype != null && mimetype.length() > 0) { mimetype = mimetype.toLowerCase(); } else { MimeType mt = mimeTypes.getMimeType(url); if (mt != null) { mimetype = mt.getName(); } } if (!indexAll) { if ((mimetype == null) || (!mimetype.startsWith(TEXT_TYPE) && !mimetype.startsWith(APPLICATION_TYPE))) { // Skip any but basic types. return; } } String noSpacesMimetype = TextUtils.replaceAll(WHITESPACE, mimetype, "-"); // LOG.info("adding " + Long.toString(arcData.getLength()) // + " bytes of mimetype " + noSpacesMimetype + " " + url); // copy http headers to nutch metadata Properties metaData = new Properties(); Header[] headers = rec.getHttpHeaders(); for (int j = 0; j < headers.length; j++) { Header header = headers[j]; metaData.put(header.getName(), header.getValue()); } // Add the collection name, the arcfile name, and the offset. // Also add mimetype. Needed by the ia indexers. metaData.put(ARCCOLLECTION_KEY, this.collectionName); metaData.put(ARCFILENAME_KEY, arcName); metaData.put(ARCFILEOFFSET_KEY, Long.toString(arcData.getOffset())); metaData.put(CONTENT_TYPE_KEY, mimetype); // Collect content bytes // TODO: Skip if unindexable type. rec.skipHttpHeader(); ByteArrayOutputStream contentBuffer = new ByteArrayOutputStream(); byte[] buf = new byte[1024 * 4]; int total = 0; int len = rec.read(buf, 0, buf.length); while (len != -1 && total < this.contentLimit) { total += len; contentBuffer.write(buf, 0, len); len = rec.read(buf, 0, buf.length); } // System.out.println("--------------"); // System.out.write(contentBuffer.toByteArray()); // System.out.println("--------------"); byte[] contentBytes = contentBuffer.toByteArray(); Content content = new Content(url, url, contentBytes, mimetype, metaData); metaData.put(Fetcher.DIGEST_KEY, MD5Hash.digest(contentBytes).toString()); metaData.put(Fetcher.SEGMENT_NAME_KEY, segmentName); CrawlDatum datum = new CrawlDatum(); datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS); long date = 0; try { date = ArchiveUtils.parse14DigitDate(arcData.getDate()).getTime(); } catch (java.text.ParseException e) { LOG.severe("Failed parse of date: " + arcData.getDate()); } datum.setFetchTime(date); Parse parse = null; ParseStatus parseStatus; try { Parser parser = ParserFactory.getParser(content.getContentType(), content.getBaseUrl()); parse = parser.getParse(content); parseStatus = parse.getData().getStatus(); } catch (Exception e) { parseStatus = new ParseStatus(e); } if (!parseStatus.isSuccess()) { LOG.warning("Error parsing: "+url+": "+parseStatus); parse = null; } output.collect(new UTF8(url), new FetcherOutput(datum, null, parse!=null ? new ParseImpl(parse):null)); } public void importArcs(File arcUrlsDir, File segment) throws IOException { LOG.info("ImportArcs: starting"); LOG.info("ImportArcs: arcUrlsDir: " + arcUrlsDir); LOG.info("ImportArcs: segment: " + segment); JobConf job = new JobConf(getConf()); job.setJar("build/nutchwax.job.jar"); job.set(Fetcher.SEGMENT_NAME_KEY, segment.getName()); job.setInputDir(arcUrlsDir); job.setMapperClass(ImportArcs.class); job.setOutputDir(segment); job.setOutputFormat(FetcherOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(FetcherOutput.class); JobClient.runJob(job); LOG.info("ImportArcs: done"); } public static void main(String[] args) throws Exception { // parse command line options String usage = "Usage: ImportArcs arcUrlsDir segmentDir"; if (args.length != 2) { System.err.println(usage); System.exit(-1); } File arcUrlsDir = new File(args[0]); File segmentDir = new File(args[1]); new ImportArcs(NutchConf.get()).importArcs(arcUrlsDir, segmentDir); } } --- NEW FILE: IndexArcs.java --- /** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.archive.access.nutch; import java.io.*; import java.net.*; import java.util.*; import java.text.*; import java.util.logging.*; import org.apache.nutch.io.*; import org.apache.nutch.fs.*; import org.apache.nutch.util.*; import org.apache.nutch.mapred.*; import org.apache.nutch.crawl.*; public class IndexArcs { public static final Logger LOG = LogFormatter.getLogger("org.archive.acces.nutch.IndexArcs"); private static String getDate() { return new SimpleDateFormat("yyyyMMddHHmmss").format (new Date(System.currentTimeMillis())); } /* Import and index a set of arc files. */ public static void main(String args[]) throws Exception { if (args.length < 1) { System.out.println("Usage: IndexArcs <arcsDir> [-dir d]"); return; } JobConf conf = new JobConf(NutchConf.get()); File arcsDir = null; File dir = new File("crawl-" + getDate()); for (int i = 0; i < args.length; i++) { if ("-dir".equals(args[i])) { dir = new File(args[i+1]); i++; } else if (args[i] != null) { arcsDir = new File(args[i]); } } NutchFileSystem fs = NutchFileSystem.get(conf); if (fs.exists(dir)) { throw new RuntimeException(dir + " already exists."); } LOG.info("IndexArcs started in: " + dir); LOG.info("arcsDir = " + arcsDir); File linkDb = new File(dir + "/linkdb"); File index = new File(dir + "/indexes"); File segments = new File(dir + "/segments"); File segment = new File(segments, getDate()); // import arcs new ImportArcs(conf).importArcs(arcsDir, segment); // invert links new LinkDb(conf).invert(linkDb, segments); // index everything new Indexer(conf).index(index, linkDb, fs.listFiles(segments)); LOG.info("IndexArcs finished: " + dir); } } --- Arc2Segment.java DELETED --- |