From: <bra...@us...> - 2011-06-16 17:26:38
|
Revision: 3478 http://archive-access.svn.sourceforge.net/archive-access/?rev=3478&view=rev Author: bradtofel Date: 2011-06-16 17:26:31 +0000 (Thu, 16 Jun 2011) Log Message: ----------- FEATURE: abstracted out fetching of byte chunks from local/remote files moved current code into Http11BlockLoader, which now uses a multithreaded HTTP connection manager to reuse connections implemented an HDFS BlockLoader Modified Paths: -------------- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinedBlock.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinesSearchResultSource.java Added Paths: ----------- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/BlockLoader.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/HDFSBlockLoader.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/Http11BlockLoader.java trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/RemoteHttp11BlockLoader.java Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/BlockLoader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/BlockLoader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/BlockLoader.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -0,0 +1,19 @@ +package org.archive.wayback.resourceindex.ziplines; + +import java.io.IOException; + +public interface BlockLoader { + /** + * Fetch a range of bytes from a particular URL. Note that the bytes are + * read into memory all at once, so care should be taken with the length + * argument. + * + * @param url String URL to fetch + * @param offset byte start offset of the desired range + * @param length number of octets to fetch + * @return a new byte[] containing the octets fetched + * @throws IOException on Network and protocol failures, as well as Timeouts + */ + public byte[] getBlock(String url, long offset, int length) + throws IOException; +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/HDFSBlockLoader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/HDFSBlockLoader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/HDFSBlockLoader.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -0,0 +1,46 @@ +package org.archive.wayback.resourceindex.ziplines; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HDFSBlockLoader implements BlockLoader { + FileSystem fs = null; + String defaultFSURI = null; + public HDFSBlockLoader(String defaultFSURI) { + this.defaultFSURI = defaultFSURI; + } + public void init() throws IOException, URISyntaxException { + Configuration c = new Configuration(); + c.set("fs.default.name",defaultFSURI); + fs = FileSystem.get(new URI(defaultFSURI),c); + } + + public byte[] getBlock(String url, long offset, int length) + throws IOException { + Path path = new Path(url); + FSDataInputStream s = fs.open(path); + byte buffer[] = new byte[length]; + s.readFully(offset, buffer); + return buffer; + } + + /** + * @return the defaultFSURI + */ + public String getDefaultFSURI() { + return defaultFSURI; + } + + /** + * @param defaultFSURI the defaultFSURI to set + */ + public void setDefaultFSURI(String defaultFSURI) { + this.defaultFSURI = defaultFSURI; + } +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/Http11BlockLoader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/Http11BlockLoader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/Http11BlockLoader.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -0,0 +1,164 @@ +package org.archive.wayback.resourceindex.ziplines; + +import java.io.IOException; +import java.io.InputStream; +import java.util.logging.Logger; +import org.apache.commons.httpclient.HostConfiguration; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.archive.wayback.webapp.PerformanceLogger; + +import com.google.common.io.ByteStreams; + +/** + * Class which wraps most of the complexity of an apache commons httpclient + * MultiThreaderHttpConnectionManager, exposing common configuration elements + * to Spring configuration. + * + * This class is a near direct copy of RemoteLiveWebCache: refactoring needed. + * + * @author brad + * + */ +public class Http11BlockLoader implements BlockLoader { + private static final Logger LOGGER = Logger.getLogger( + Http11BlockLoader.class.getName()); + + private MultiThreadedHttpConnectionManager connectionManager = null; + private HostConfiguration hostConfiguration = null; + private HttpClient http = null; + + /** + * + */ + public Http11BlockLoader() { + connectionManager = new MultiThreadedHttpConnectionManager(); + hostConfiguration = new HostConfiguration(); + HttpClientParams params = new HttpClientParams(); +// params.setParameter(HttpClientParams.RETRY_HANDLER, new NoRetryHandler()); + http = new HttpClient(params,connectionManager); + http.setHostConfiguration(hostConfiguration); + } + + /** + * Fetch a range of bytes from a particular URL. Note that the bytes are + * read into memory all at once, so care should be taken with the length + * argument. + * + * @param url String URL to fetch + * @param offset byte start offset of the desired range + * @param length number of octets to fetch + * @return a new byte[] containing the octets fetched + * @throws IOException on HTTP and Socket failures, as well as Timeouts + */ + public byte[] getBlock(String url, long offset, int length) + throws IOException { + + HttpMethod method = null; + try { + method = new GetMethod(url); + } catch(IllegalArgumentException e) { + LOGGER.warning("Bad URL for live web fetch:" + url); + throw new IOException("Url:" + url + " does not look like an URL?"); + } + StringBuilder sb = new StringBuilder(16); + sb.append(ZiplinedBlock.BYTES_HEADER).append(offset); + sb.append(ZiplinedBlock.BYTES_MINUS).append((offset + length)-1); + String rangeHeader = sb.toString(); + method.addRequestHeader(ZiplinedBlock.RANGE_HEADER, rangeHeader); + //uc.setRequestProperty(RANGE_HEADER, sb.toString()); + long start = System.currentTimeMillis(); + try { + LOGGER.fine("Reading block:" + url + "("+rangeHeader+")"); + int status = http.executeMethod(method); + if((status == 200) || (status == 206)) { + InputStream is = method.getResponseBodyAsStream(); + byte[] block = new byte[length]; + ByteStreams.readFully(is, block); + long elapsed = System.currentTimeMillis() - start; + PerformanceLogger.noteElapsed("CDXBlockLoad",elapsed,url); + return block; + + } else { + throw new IOException("Bad status for " + url); + } + } finally { + method.releaseConnection(); + } + } + + /** + * @param hostPort to proxy requests through - ex. "localhost:3128" + */ + public void setProxyHostPort(String hostPort) { + int colonIdx = hostPort.indexOf(':'); + if(colonIdx > 0) { + String host = hostPort.substring(0,colonIdx); + int port = Integer.valueOf(hostPort.substring(colonIdx+1)); + + hostConfiguration.setProxy(host, port); + } + } + + /** + * @param maxTotalConnections the HttpConnectionManagerParams config + */ + public void setMaxTotalConnections(int maxTotalConnections) { + connectionManager.getParams(). + setMaxTotalConnections(maxTotalConnections); + } + + /** + * @return the HttpConnectionManagerParams maxTotalConnections config + */ + public int getMaxTotalConnections() { + return connectionManager.getParams().getMaxTotalConnections(); + } + + /** + * @param maxHostConnections the HttpConnectionManagerParams config + */ + public void setMaxHostConnections(int maxHostConnections) { + connectionManager.getParams(). + setMaxConnectionsPerHost(hostConfiguration, maxHostConnections); + } + + /** + * @return the HttpConnectionManagerParams maxHostConnections config + */ + public int getMaxHostConnections() { + return connectionManager.getParams(). + getMaxConnectionsPerHost(hostConfiguration); + } + + /** + * @return the connectionTimeoutMS + */ + public int getConnectionTimeoutMS() { + return connectionManager.getParams().getConnectionTimeout(); + } + + /** + * @param connectionTimeoutMS the connectionTimeoutMS to set + */ + public void setConnectionTimeoutMS(int connectionTimeoutMS) { + connectionManager.getParams().setConnectionTimeout(connectionTimeoutMS); + } + + /** + * @return the socketTimeoutMS + */ + public int getSocketTimeoutMS() { + return connectionManager.getParams().getSoTimeout(); + } + + /** + * @param socketTimeoutMS the socketTimeoutMS to set + */ + public void setSocketTimeoutMS(int socketTimeoutMS) { + connectionManager.getParams().setSoTimeout(socketTimeoutMS); + } +} Added: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/RemoteHttp11BlockLoader.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/RemoteHttp11BlockLoader.java (rev 0) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/RemoteHttp11BlockLoader.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -0,0 +1,9 @@ +package org.archive.wayback.resourceindex.ziplines; + +/** + * @author brad + * @deprecated use Http11BlockLoader + */ +public class RemoteHttp11BlockLoader extends Http11BlockLoader { + +} Modified: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinedBlock.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinedBlock.java 2011-06-16 17:23:08 UTC (rev 3477) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinedBlock.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -20,6 +20,7 @@ package org.archive.wayback.resourceindex.ziplines; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URL; @@ -37,13 +38,14 @@ private static final Logger LOGGER = Logger.getLogger( ZiplinedBlock.class.getName()); + BlockLoader loader = null; String urlOrPath = null; long offset = -1; int count = 0; public final static int BLOCK_SIZE = 128 * 1024; - private final static String RANGE_HEADER = "Range"; - private final static String BYTES_HEADER = "bytes="; - private final static String BYTES_MINUS = "-"; + public final static String RANGE_HEADER = "Range"; + public final static String BYTES_HEADER = "bytes="; + public final static String BYTES_MINUS = "-"; /** * @param urlOrPath URL where this file can be downloaded * @param offset start of 128K block boundary. @@ -62,10 +64,29 @@ this.count = count; } /** + * @param loader the RemoteHttp11BlockLoader to use when fetching this block + */ + public void setLoader(BlockLoader loader) { + this.loader = loader; + } + /** * @return a BufferedReader of the underlying compressed data in this block * @throws IOException for usual reasons */ public BufferedReader readBlock() throws IOException { + if(loader != null) { + return readBlockEfficiently(loader); + } + return readBlockInefficiently(); + } + private BufferedReader readBlockEfficiently(BlockLoader remote) + throws IOException { + byte bytes[] = remote.getBlock(urlOrPath, offset, BLOCK_SIZE); + return new BufferedReader(new InputStreamReader( + new GZIPInputStream(new ByteArrayInputStream(bytes)), + ByteOp.UTF8)); + } + private BufferedReader readBlockInefficiently() throws IOException { StringBuilder sb = new StringBuilder(16); sb.append(BYTES_HEADER).append(offset).append(BYTES_MINUS); sb.append((offset + BLOCK_SIZE)-1); Modified: trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinesSearchResultSource.java =================================================================== --- trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinesSearchResultSource.java 2011-06-16 17:23:08 UTC (rev 3477) +++ trunk/archive-access/projects/wayback/wayback-core/src/main/java/org/archive/wayback/resourceindex/ziplines/ZiplinesSearchResultSource.java 2011-06-16 17:26:31 UTC (rev 3478) @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintWriter; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -78,6 +79,7 @@ private HashMap<String,String> chunkMap = null; private CDXFormat format = null; private int maxBlocks = 1000; + private BlockLoader blockLoader = null; public ZiplinesSearchResultSource() { } @@ -165,7 +167,9 @@ String url = chunkMap.get(parts[1]); long offset = Long.parseLong(parts[2]); LOGGER.info("Adding block source(" + parts[1] + "):" + offset); - blocks.add(new ZiplinedBlock(url, offset)); + ZiplinedBlock block = new ZiplinedBlock(url, offset); + block.setLoader(blockLoader); + blocks.add(block); } } finally { if(itr != null) { @@ -245,8 +249,22 @@ */ public void setMaxBlocks(int maxBlocks) { this.maxBlocks = maxBlocks; - } + } + /** + * @return the blockLoader + */ + public BlockLoader getBlockLoader() { + return blockLoader; + } + + /** + * @param blockLoader the blockLoader to set + */ + public void setBlockLoader(BlockLoader blockLoader) { + this.blockLoader = blockLoader; + } + private static void USAGE() { System.err.println("USAGE:"); System.err.println(""); @@ -267,6 +285,7 @@ // String cdxSpec = CDXFormatIndex.CDX_HEADER_MAGIC; String cdxSpec = " CDX N b a m s k r V g"; CDXFormat format = null; + BlockLoader blockLoader = new Http11BlockLoader(); try { format = new CDXFormat(cdxSpec); } catch (CDXFormatException e1) { @@ -291,6 +310,23 @@ } } else if(args[idx].equals("-blockDump")) { blockDump = true; + } else if(args[idx].equals("-hdfs")) { + idx++; + if(idx >= args.length) { + USAGE(); + } + blockLoader = new HDFSBlockLoader(args[idx]); + try { + ((HDFSBlockLoader)blockLoader).init(); + } catch (IOException e) { + e.printStackTrace(); + USAGE(); + System.exit(1); + } catch (URISyntaxException e) { + e.printStackTrace(); + USAGE(); + System.exit(1); + } } else if(args[idx].equals("-max")) { idx++; if(idx >= args.length) { @@ -319,6 +355,7 @@ USAGE(); } // first is summary path, then location path, then search key: + zl.setBlockLoader(blockLoader); zl.setChunkIndexPath(args[idx++]); zl.setChunkMapPath(args[idx++]); String key = args[idx++]; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |