From: <mar...@us...> - 2013-12-11 15:45:17
|
Revision: 7633 http://bigdata.svn.sourceforge.net/bigdata/?rev=7633&view=rev Author: martyncutcher Date: 2013-12-11 15:45:11 +0000 (Wed, 11 Dec 2013) Log Message: ----------- Refactor to use TokenDrain class to encapsulate token processing Modified Paths: -------------- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java Modified: branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 14:32:56 UTC (rev 7632) +++ branches/PIPELINE_RESYNC/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-11 15:45:11 UTC (rev 7633) @@ -1026,6 +1026,82 @@ } // call. + class TokenDrain { + final byte[] token; + final byte[] tokenBuffer; + final ByteBuffer tokenBB; + final Client client; + + int tokenIndex = 0; + int ntokenreads = 0; + int ntokenbytematches = 0; + + TokenDrain(final byte[] token, final Client client) { + this.token = token; + this.tokenBuffer = token == null ? null : new byte[token.length]; + this.tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); + this.client = client; + + if (log.isDebugEnabled()) + log.debug("Receive token: " + BytesUtil.toHexString(token)); + + } + + boolean foundToken() throws IOException { + + if (log.isDebugEnabled()) + log.debug("Looking for token, reads: " + ntokenreads); + // Note that the logic for finding the token bytes depends on the + // first byte in the token being unique! + // We have to be a bit clever to be sure we do not read beyond the + // token and therefore complicate the reading into the localBuffer. + // This is optimized for the normal case where the key token is read + // as the next bytes from the stream. In the worst case scenario this + // could read large amounts of data only a few bytes at a time, however + // this is not in reality a significant overhead. + while (tokenIndex < token.length ) { + final int remtok = token.length - tokenIndex; + tokenBB.limit(remtok); + tokenBB.position(0); + + final int rdLen = client.client.read(tokenBB); + for (int i = 0; i < rdLen; i++) { + if (tokenBuffer[i] != token[tokenIndex]) { + if (ntokenreads < 2) + log.warn("TOKEN MISMATCH"); + tokenIndex = 0; + if (tokenBuffer[i] == token[tokenIndex]) { + tokenIndex++; + } + } else { + tokenIndex++; + ntokenbytematches++; + } + } + + ntokenreads++; + if (ntokenreads % 10000 == 0) { + if (log.isDebugEnabled()) + log.debug("...still looking, reads: " + ntokenreads); + } + + } + + if (tokenIndex != token.length) { // not sufficient data ready + if (log.isDebugEnabled()) + log.debug("Not found token yet!"); + return false; + } else { + if (log.isDebugEnabled()) + log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); + + return true; + } + + } + + } + private void doReceiveAndReplicate(final Client client) throws Exception { @@ -1047,19 +1123,10 @@ // for debug retain number of low level reads int reads = 0; - // setup token values to search for any provided token prefix - final byte[] token = message.getToken(); + final TokenDrain tokenDrain = new TokenDrain(message.getToken(), client); - boolean foundStart = token == null; // if null then not able to check - int tokenIndex = 0; - final byte[] tokenBuffer = token == null ? null : new byte[token.length]; - final ByteBuffer tokenBB = token == null ? null : ByteBuffer.wrap(tokenBuffer); - - if (log.isDebugEnabled()) - log.debug("Receive token: " + BytesUtil.toHexString(token)); + while (rem > 0 && !EOS) { - while (rem > 0 && !EOS) { - // block up to the timeout. final int nkeys = client.clientSelector .select(selectorTimeout/* ms */); @@ -1115,62 +1182,13 @@ final Iterator<SelectionKey> iter = keys.iterator(); - int ntokenreads = 0; - int ntokenbytematches = 0; - while (iter.hasNext()) { iter.next(); iter.remove(); - if (!foundStart) { - if (log.isDebugEnabled()) - log.debug("Looking for token, reads: " + ntokenreads); - // Note that the logic for finding the token bytes depends on the - // first byte in the token being unique! - // We have to be a bit clever to be sure we do not read beyond the - // token and therefore complicate the reading into the localBuffer. - // This is optimized for the normal case where the key token is read - // as the next bytes from the stream. In the worst case scenario this - // could read large amounts of data only a few bytes at a time, however - // this is not in reality a significant overhead. - while (tokenIndex < token.length ) { - final int remtok = token.length - tokenIndex; - tokenBB.limit(remtok); - tokenBB.position(0); - - final int rdLen = client.client.read(tokenBB); - for (int i = 0; i < rdLen; i++) { - if (tokenBuffer[i] != token[tokenIndex]) { - if (ntokenreads < 2) - log.warn("TOKEN MISMATCH"); - tokenIndex = 0; - if (tokenBuffer[i] == token[tokenIndex]) { - tokenIndex++; - } - } else { - tokenIndex++; - ntokenbytematches++; - } - } - - ntokenreads++; - if (ntokenreads % 10000 == 0) { - if (log.isDebugEnabled()) - log.debug("...still looking, reads: " + ntokenreads); - } - - foundStart = tokenIndex == token.length; - } - - if (!foundStart) { // not sufficient data ready - // if (log.isDebugEnabled()) - log.warn("Not found token yet!"); - continue; - } else { - if (log.isDebugEnabled()) - log.debug("Found token after " + ntokenreads + " token reads and " + ntokenbytematches + " byte matches"); - } + if (!tokenDrain.foundToken()) { + continue; } final int rdlen = client.client.read(localBuffer); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |