From: <bra...@us...> - 2008-06-24 22:58:45
|
Revision: 2307 http://archive-access.svn.sourceforge.net/archive-access/?rev=2307&view=rev Author: bradtofel Date: 2008-06-24 15:58:51 -0700 (Tue, 24 Jun 2008) Log Message: ----------- INITIAL REV: classes which: * monitor a ResourceFileLocationDB's log * create events when new files are noticed * add those new files to a queue of files needing indexing * monitor the index queue, performing indexing when needed * push new index data to a local or remote ResourceIndex Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/DirectoryIndexQueue.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueue.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueueUpdater.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexWorker.java Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/DirectoryIndexQueue.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/DirectoryIndexQueue.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/DirectoryIndexQueue.java 2008-06-24 22:58:51 UTC (rev 2307) @@ -0,0 +1,95 @@ +/* DirectoryIndexQueue + * + * $Id$ + * + * Created on 2:29:10 PM Jun 23, 2008. + * + * Copyright (C) 2008 Internet Archive. + * + * This file is part of wayback. + * + * wayback is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * any later version. + * + * wayback is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License + * along with wayback; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.wayback.resourcestore.indexer; + +import java.io.File; +import java.io.IOException; + +import org.archive.wayback.util.DirMaker; + +/** + * Simple queue implementation, which uses a directory containing empty files + * to indicate the presence of items in a queue (set in this case...) + * + * @author brad + * @version $Date$, $Revision$ + */ +public class DirectoryIndexQueue implements IndexQueue { + private File path = null; + + /* (non-Javadoc) + * @see org.archive.wayback.resourcestore.indexer.IndexQueue#dequeue() + */ + public String dequeue() throws IOException { + String[] names = path.list(); + for(String name : names) { + File tmp = new File(path,name); + if(tmp.isFile()) { + if(tmp.delete()) { + return name; + } else { + throw new IOException("Unable to dequeue/delete (" + + tmp.getAbsolutePath()); + } + } + } + return null; + } + + /* (non-Javadoc) + * @see org.archive.wayback.resourcestore.indexer.IndexQueue#enqueue(java.lang.String) + */ + public void enqueue(String resourceFileName) throws IOException { + File tmp = new File(path,resourceFileName); + if(!tmp.isFile()) { + tmp.createNewFile(); + } + } + + /** + * @return the path + */ + public String getPath() { + if(path != null) { + return path.getAbsolutePath(); + } + return null; + } + + /** + * @param path the path to set + * @throws IOException + */ + public void setPath(String path) throws IOException { + this.path = DirMaker.ensureDir(path); + } + + /* (non-Javadoc) + * @see org.archive.wayback.resourcestore.indexer.IndexQueue#recordStatus(java.lang.String, int) + */ + public void recordStatus(String resourceFileName, int status) { + + } +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueue.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueue.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueue.java 2008-06-24 22:58:51 UTC (rev 2307) @@ -0,0 +1,42 @@ +/* IndexQueue + * + * $Id$ + * + * Created on 2:05:12 PM Jun 23, 2008. + * + * Copyright (C) 2008 Internet Archive. + * + * This file is part of wayback. + * + * wayback is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * any later version. + * + * wayback is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License + * along with wayback; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.wayback.resourcestore.indexer; + +import java.io.IOException; + +/** + * + * + * @author brad + * @version $Date$, $Revision$ + */ +public interface IndexQueue { + public final static int STATUS_DONE = 0; + public final static int STATUS_FAIL = 1; + public final static int STATUS_RETRY = 2; + public void enqueue(String resourceFileName) throws IOException; + public String dequeue() throws IOException; + public void recordStatus(String resourceFileName, int status); +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueueUpdater.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueueUpdater.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexQueueUpdater.java 2008-06-24 22:58:51 UTC (rev 2307) @@ -0,0 +1,221 @@ +/* IndexQueueUpdater + * + * $Id$ + * + * Created on 2:02:54 PM Jun 23, 2008. + * + * Copyright (C) 2008 Internet Archive. + * + * This file is part of wayback. + * + * wayback is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * any later version. + * + * wayback is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License + * along with wayback; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.wayback.resourcestore.indexer; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.logging.Logger; + +import org.archive.wayback.resourcestore.locationdb.ResourceFileLocationDB; +import org.archive.wayback.util.CloseableIterator; +import org.archive.wayback.util.DirMaker; + +/** + * This class polls a ResourceFileLocationDB repeatedly, to notice new files + * arriving in the DB. Whenever new files are noticed, they are added to the + * Index Queue. + * + * It uses a local file to store the last known "mark" of the location DB. + * + * @author brad + * @version $Date$, $Revision$ + */ +public class IndexQueueUpdater { + + private static final Logger LOGGER = + Logger.getLogger(IndexQueueUpdater.class.getName()); + + private ResourceFileLocationDB db = null; + private IndexQueue queue = null; + private UpdateThread thread = null; + private MarkMemoryFile lastMark = null; + private long interval = 120000; + + public void init() { + if(interval > 0) { + thread = new UpdateThread(this,interval); + thread.start(); + } + } + + public void shutdown() { + if(thread != null) { + thread.interrupt(); + try { + thread.join(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public int updateQueue() throws IOException { + int added = 0; + long lastMarkPoint = lastMark.getLastMark(); + long currentMarkPoint = db.getCurrentMark(); + if(currentMarkPoint > lastMarkPoint) { + // TODO: touchy touchy... need transactions here to not have + // state sync problems if something goes badly in this block.. + // for example, it would be possible to constantly enqueue the + // same files forever.. + CloseableIterator<String> newNames = + db.getNamesBetweenMarks(lastMarkPoint, currentMarkPoint); + while(newNames.hasNext()) { + queue.enqueue(newNames.next()); + added++; + } + newNames.close(); + lastMark.setLastMark(currentMarkPoint); + } + return added; + } + + private class MarkMemoryFile { + private File file = null; + public MarkMemoryFile(File file) { + this.file = file; + } + + public long getLastMark() throws IOException { + long mark = 0; + if(file.isFile() && file.length() > 0) { + BufferedReader ir = new BufferedReader(new FileReader(file)); + String line = ir.readLine(); + if(line != null) { + mark = Long.parseLong(line); + } + } + return mark; + } + + public void setLastMark(long mark) throws IOException { + PrintWriter pw = new PrintWriter(file); + pw.println(mark); + pw.close(); + } + public String getAbsolutePath() { + return file.getAbsolutePath(); + } + } + + private class UpdateThread extends Thread { + private long runInterval = 120000; + private IndexQueueUpdater updater = null; + + public UpdateThread(IndexQueueUpdater updater, + long runInterval) { + + this.updater = updater; + this.runInterval = runInterval; + } + + public void run() { + LOGGER.info("alive"); + long sleepInterval = runInterval; + while (true) { + try { + int updated = updater.updateQueue(); + + if(updated > 0) { + LOGGER.info("Updated " + updated + " files.."); + sleepInterval = runInterval; + } else { + LOGGER.info("Updated ZERO files.."); + sleepInterval += runInterval; + } + sleep(sleepInterval); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + /** + * @return the db + */ + public ResourceFileLocationDB getDb() { + return db; + } + + /** + * @param db the db to set + */ + public void setDb(ResourceFileLocationDB db) { + this.db = db; + } + + /** + * @return the queue + */ + public IndexQueue getQueue() { + return queue; + } + + /** + * @param queue the queue to set + */ + public void setQueue(IndexQueue queue) { + this.queue = queue; + } + + /** + * @return the stateFile + */ + public String getLastMark() { + if(lastMark != null) { + return lastMark.getAbsolutePath(); + } + return null; + } + + /** + * @param stateFile the stateFile to set + * @throws IOException + */ + public void setLastMark(String path) throws IOException { + File tmp = new File(path); + DirMaker.ensureDir(tmp.getParentFile().getAbsolutePath()); + lastMark = new MarkMemoryFile(tmp); + } + + /** + * @return the interval + */ + public long getInterval() { + return interval; + } + + /** + * @param interval the interval to set + */ + public void setInterval(long interval) { + this.interval = interval; + } +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexWorker.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexWorker.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/indexer/IndexWorker.java 2008-06-24 22:58:51 UTC (rev 2307) @@ -0,0 +1,234 @@ +/* IndexWorker + * + * $Id$ + * + * Created on 2:58:51 PM Jun 23, 2008. + * + * Copyright (C) 2008 Internet Archive. + * + * This file is part of wayback. + * + * wayback is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * any later version. + * + * wayback is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser Public License for more details. + * + * You should have received a copy of the GNU Lesser Public License + * along with wayback; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +package org.archive.wayback.resourcestore.indexer; + +import java.io.IOException; +import java.util.logging.Logger; + +import org.archive.wayback.UrlCanonicalizer; +import org.archive.wayback.core.SearchResult; +import org.archive.wayback.resourceindex.indexer.IndexClient; +import org.archive.wayback.resourcestore.ArcIndexer; +import org.archive.wayback.resourcestore.WarcIndexer; +import org.archive.wayback.resourcestore.locationdb.ResourceFileLocationDB; +import org.archive.wayback.util.CloseableIterator; +//import org.archive.wayback.util.url.AggressiveUrlCanonicalizer; +import org.archive.wayback.util.url.IdentityUrlCanonicalizer; + +/** + * Simple worker, which gets tasks from an IndexQueue, in the case, the name + * of ARC/WARC files to be indexed, retrieves the ARC/WARC location from a + * ResourceFileLocationDB, creates the index, which is serialized into a file, + * and then hands that file off to a ResourceIndex for merging, using an + * IndexClient. + * + * @author brad + * @version $Date$, $Revision$ + */ +public class IndexWorker { + private static final Logger LOGGER = + Logger.getLogger(IndexWorker.class.getName()); + + public final static String ARC_EXTENSION = ".arc"; + public final static String ARC_GZ_EXTENSION = ".arc.gz"; + public final static String WARC_EXTENSION = ".warc"; + public final static String WARC_GZ_EXTENSION = ".warc.gz"; + + private ArcIndexer arcIndexer = new ArcIndexer(); + private WarcIndexer warcIndexer = new WarcIndexer(); + + private UrlCanonicalizer canonicalizer = new IdentityUrlCanonicalizer(); +// private UrlCanonicalizer canonicalizer = new AggressiveUrlCanonicalizer(); + + private long interval = 120000; + private IndexQueue queue = null; + private ResourceFileLocationDB db = null; + private IndexClient target = null; + private WorkerThread thread = null; + + public void init() { + arcIndexer.setCanonicalizer(canonicalizer); + warcIndexer.setCanonicalizer(canonicalizer); + if(interval > 0) { + thread = new WorkerThread(this,interval); + thread.start(); + } + } + + public void shutdown() { + if(thread != null) { + thread.interrupt(); + try { + thread.join(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public boolean doWork() throws IOException { + boolean worked = false; + String name = queue.dequeue(); + if(name != null) { + worked = true; + String[] pathsOrUrls = null; + try { + pathsOrUrls = db.nameToUrls(name); + } catch(IOException e) { + LOGGER.severe("FAILED TO LOOKUP(" + name + ")" + + e.getLocalizedMessage()); + return false; + } + try { + if(pathsOrUrls != null) { + for(String pathOrUrl : pathsOrUrls) { + CloseableIterator<SearchResult> itr = indexFile(pathOrUrl); + target.addSearchResults(name, itr); + itr.close(); + break; + } + } + } catch(IOException e) { + LOGGER.severe("FAILED to index or upload (" + name + ")"); + } + } + return worked; + } + + public CloseableIterator<SearchResult> indexFile(String pathOrUrl) + throws IOException { + + CloseableIterator<SearchResult> itr = null; + + if(pathOrUrl.endsWith(ARC_EXTENSION)) { + itr = arcIndexer.iterator(pathOrUrl); + } else if(pathOrUrl.endsWith(ARC_GZ_EXTENSION)) { + itr = arcIndexer.iterator(pathOrUrl); + } else if(pathOrUrl.endsWith(WARC_EXTENSION)) { + itr = warcIndexer.iterator(pathOrUrl); + } else if(pathOrUrl.endsWith(WARC_GZ_EXTENSION)) { + itr = warcIndexer.iterator(pathOrUrl); + } + return itr; + } + + + private class WorkerThread extends Thread { + private long runInterval = 120000; + private IndexWorker worker = null; + + public WorkerThread(IndexWorker worker, long runInterval) { + this.worker = worker; + this.runInterval = runInterval; + } + + public void run() { + LOGGER.info("alive."); + long sleepInterval = runInterval; + while (true) { + try { + boolean worked = worker.doWork(); + + if(worked) { + LOGGER.info("Did work, no sleep.."); + sleepInterval = 0; + } else { + LOGGER.info("No Work to do - sleeping.."); + sleepInterval += runInterval; + } + if(sleepInterval > 0) { + sleep(sleepInterval); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + + /** + * @return the interval + */ + public long getInterval() { + return interval; + } + /** + * @param interval the interval to set + */ + public void setInterval(long interval) { + this.interval = interval; + } + /** + * @return the queue + */ + public IndexQueue getQueue() { + return queue; + } + /** + * @param queue the queue to set + */ + public void setQueue(IndexQueue queue) { + this.queue = queue; + } + /** + * @return the db + */ + public ResourceFileLocationDB getDb() { + return db; + } + /** + * @param db the db to set + */ + public void setDb(ResourceFileLocationDB db) { + this.db = db; + } + /** + * @return the target + */ + public IndexClient getTarget() { + return target; + } + /** + * @param target the target to set + */ + public void setTarget(IndexClient target) { + this.target = target; + } + /** + * @return the canonicalizer + */ + public UrlCanonicalizer getCanonicalizer() { + return canonicalizer; + } + /** + * @param canonicalizer the canonicalizer to set + */ + public void setCanonicalizer(UrlCanonicalizer canonicalizer) { + this.canonicalizer = canonicalizer; + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |