From: <bra...@us...> - 2007-11-28 03:12:59
|
Revision: 2090 http://archive-access.svn.sourceforge.net/archive-access/?rev=2090&view=rev Author: bradtofel Date: 2007-11-27 19:13:01 -0800 (Tue, 27 Nov 2007) Log Message: ----------- INITIAL REV: new LocalResourceStore implementation that allows compressed or uncompressed ARCs and WARCs Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/AutoIndexThread.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/LocalResourceStore.java Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/AutoIndexThread.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/AutoIndexThread.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/AutoIndexThread.java 2007-11-28 03:13:01 UTC (rev 2090) @@ -0,0 +1,216 @@ +package org.archive.wayback.resourcestore; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.logging.Logger; + +import org.archive.wayback.core.SearchResult; +import org.archive.wayback.resourceindex.indexer.IndexClient; +import org.archive.wayback.util.DirMaker; + +/** + * Thread that repeatedly notices new files in the LocalResourceStore, indexes + * those files, and hands them off to a ResourceIndex via an IndexClient + * + * @author brad + * @version $Date$, $Revision$ + */ +public class AutoIndexThread extends Thread { + private static final Logger LOGGER = + Logger.getLogger(AutoIndexThread.class.getName()); + + private final static int DEFAULT_RUN_INTERVAL_MS = 10000; + private LocalResourceStore store = null; + private File workDir = null; + private File queuedDir = null; + private int runInterval = DEFAULT_RUN_INTERVAL_MS; + private IndexClient indexClient = null; + + /** + * @param store + * @param runInterval + */ + public AutoIndexThread() { + super("AutoARCIndexThread"); + super.setDaemon(true); + } + + public void run() { + LOGGER.info("AutoIndexThread is alive."); + int sleepInterval = runInterval; + if(store == null) { + throw new RuntimeException("No LocalResourceStore set"); + } + while (true) { + try { + int numIndexed = indexNewArcs(); + if (numIndexed == 0) { + sleep(sleepInterval); + sleepInterval += runInterval; + } else { + sleepInterval = runInterval; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Scan for new ARC files, and index any new files discovered. + * + * There are 3 main steps, which could be broken into separate threads: + * 1) detect new ARCs + * 2) create CDX files for each new ARC + * 3) upload CDX files to target (or rename to local "incoming" directory) + * + * for now these are sequential. + * + * @return number of ARC files indexed + */ + public int indexNewArcs() { + int numIndexed = 0; + try { + queueNewArcsForIndex(); + } catch (IOException e) { + e.printStackTrace(); + } + try { + numIndexed = indexArcs(10); + } catch (MalformedURLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return numIndexed; + } + /** + * Find any new ARC files and queue them for indexing. + * @throws IOException + */ + public void queueNewArcsForIndex() throws IOException { + + // build a HashMap of what has been queued already: + HashMap<String,String> queued = new HashMap<String, String>(); + String entries[] = queuedDir.list(); + if(entries != null) { + for (int i = 0; i < entries.length; i++) { + queued.put(entries[i], "i"); + } + } + // now scan thru arcDir, and make a flag file for anything that was not + // already there: + Iterator<String> files = store.fileNamesIterator(); + if(files != null) { + while(files.hasNext()) { + String fileName = files.next(); + if(!queued.containsKey(fileName)) { + File newQueuedFile = new File(queuedDir,fileName); + File newToBeIndexedFile = new File(workDir,fileName); + newToBeIndexedFile.createNewFile(); + newQueuedFile.createNewFile(); + } + } + } + } + + private String fileNameToBase(final String fileName) { + return fileName; + } + + /** + * Index up to 'max' ARC/WARC files queued for indexing, queueing the + * resulting CDX files for merging with the BDBIndex. + * + * @param indexer + * @param max maximum number to index in this method call, 0 for unlimited + * @return int number of ARC/WARC files indexed + * @throws MalformedURLException + * @throws IOException + */ + public int indexArcs(int max) + throws MalformedURLException, IOException { + + int numIndexed = 0; + String toBeIndexed[] = workDir.list(); + + if (toBeIndexed != null) { + for (int i = 0; i < toBeIndexed.length; i++) { + String fileName = toBeIndexed[i]; + File file = store.getLocalFile(fileName); + if(file != null) { + File workFlagFile = new File(workDir,fileName); + String cdxBase = fileNameToBase(fileName); + + try { + + LOGGER.info("Indexing " + file.getAbsolutePath()); + Iterator<SearchResult> itr = store.indexFile(file); + + if(indexClient.addSearchResults(cdxBase, itr)) { + if (!workFlagFile.delete()) { + throw new IOException("Unable to delete " + + workFlagFile.getAbsolutePath()); + } + } + numIndexed++; + } catch (IOException e) { + LOGGER.severe("FAILED index: " + file.getAbsolutePath() + + " cause: " + e.getLocalizedMessage()); + } + if(max > 0 && (numIndexed >= max)) { + break; + } + } + } + } + return numIndexed; + } + + + + public LocalResourceStore getStore() { + return store; + } + + public void setStore(LocalResourceStore store) { + this.store = store; + } + + public String getWorkDir() { + return workDir == null ? null : workDir.getAbsolutePath(); + } + + public void setWorkDir(String workDir) throws IOException { + this.workDir = DirMaker.ensureDir(workDir); + } + + public String getQueuedDir() { + return queuedDir == null ? null : queuedDir.getAbsolutePath(); + } + + public void setQueuedDir(String queuedDir) throws IOException { + this.queuedDir = DirMaker.ensureDir(queuedDir); + } + + public int getRunInterval() { + return runInterval; + } + + public void setRunInterval(int runInterval) { + this.runInterval = runInterval; + } + + public IndexClient getIndexClient() { + return indexClient; + } + + public void setIndexClient(IndexClient indexClient) { + this.indexClient = indexClient; + } +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/LocalResourceStore.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/LocalResourceStore.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourcestore/LocalResourceStore.java 2007-11-28 03:13:01 UTC (rev 2090) @@ -0,0 +1,142 @@ +package org.archive.wayback.resourcestore; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.archive.wayback.ResourceStore; +import org.archive.wayback.WaybackConstants; +import org.archive.wayback.core.Resource; +import org.archive.wayback.core.SearchResult; +import org.archive.wayback.exception.ConfigurationException; +import org.archive.wayback.exception.ResourceNotAvailableException; + +/** + * Class which implements a local ARC, WARC, ARC.gz, WARC.gz, ResourceStore + * including an optional automatic indexing thread + * + * @author brad + * @version $Date$, $Revision$ + */ +public class LocalResourceStore implements ResourceStore { + + private File dataDir = null; + private AutoIndexThread indexThread = null; + + private ArcIndexer arcIndexer = new ArcIndexer(); + private WarcIndexer warcIndexer = new WarcIndexer(); + public final static String ARC_EXTENSION = ".arc"; + public final static String ARC_GZ_EXTENSION = ".arc.gz"; + public final static String WARC_EXTENSION = ".warc"; + public final static String WARC_GZ_EXTENSION = ".warc.gz"; + public final static String OPEN_EXTENSION = ".open"; + private final static String[] SUFFIXES = { + "", ARC_EXTENSION, ARC_GZ_EXTENSION, WARC_EXTENSION, WARC_GZ_EXTENSION + }; + private FilenameFilter filter = new ArcWarcFilenameFilter(); + + public void init() throws ConfigurationException { + if(indexThread != null) { + indexThread.setStore(this); + indexThread.start(); + } + } + protected String resultToFileName(SearchResult result) { + return result.get(WaybackConstants.RESULT_ARC_FILE); + } + + protected long resultToOffset(SearchResult result) { + return Long.parseLong(result.get(WaybackConstants.RESULT_OFFSET)); + } + + public File getLocalFile(String fileName) { + // try adding suffixes: empty string is first in the list + File file = null; + for(String suffix : SUFFIXES) { + file = new File(dataDir,fileName + suffix); + if(file.exists() && file.canRead()) { + return file; + } + } + // this might work if the full path is in the index... + file = new File(fileName); + if(file.exists() && file.canRead()) { + return file; + } + // doh. + return null; + } + + public Resource retrieveResource(SearchResult result) throws IOException, + ResourceNotAvailableException { + String fileName = resultToFileName(result); + long offset = resultToOffset(result); + File file = getLocalFile(fileName); + if (file == null) { + + // TODO: this needs to be prettied up for end user consumption.. + throw new ResourceNotAvailableException("Cannot find ARC file (" + + fileName + ")"); + } else { + + Resource r = ResourceFactory.getResource(file, offset); + return r; + } + } + + public Iterator<SearchResult> indexFile(File dataFile) throws IOException { + Iterator<SearchResult> itr = null; + + String name = dataFile.getName(); + if(name.endsWith(ARC_EXTENSION)) { + itr = arcIndexer.iterator(dataFile); + } else if(name.endsWith(ARC_GZ_EXTENSION)) { + itr = arcIndexer.iterator(dataFile); + } else if(name.endsWith(WARC_EXTENSION)) { + itr = warcIndexer.iterator(dataFile); + } else if(name.endsWith(WARC_GZ_EXTENSION)) { + itr = warcIndexer.iterator(dataFile); + } + return itr; + } + + public Iterator<String> fileNamesIterator() throws IOException { + if(dataDir != null) { + String[] files = dataDir.list(filter); + List<String> l = Arrays.asList(files); + return l.iterator(); + } + return null; + } + + public File getDataDir() { + return dataDir; + } + + public void setDataDir(File dataDir) { + this.dataDir = dataDir; + } + + private class ArcWarcFilenameFilter implements FilenameFilter { + public boolean accept(File dir, String name) { + File tmp = new File(dir,name); + if(tmp.isFile() && tmp.canRead()) { + return name.endsWith(ARC_EXTENSION) || + name.endsWith(ARC_GZ_EXTENSION) || + name.endsWith(WARC_GZ_EXTENSION) || + name.endsWith(WARC_EXTENSION); + } + return false; + } + } + + public AutoIndexThread getIndexThread() { + return indexThread; + } + public void setIndexThread(AutoIndexThread indexThread) { + this.indexThread = indexThread; + } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |