From: <th...@us...> - 2011-06-11 22:04:30
|
Revision: 2049 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=2049&view=rev Author: the8472 Date: 2011-06-11 22:04:23 +0000 (Sat, 11 Jun 2011) Log Message: ----------- -reduce amount of locking further -async writes Modified Paths: -------------- mldht/trunk/DHTIndexer.jar mldht/trunk/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java mldht/trunk/lbms/plugins/mldht/kad/RPCServer.java mldht/trunk/lbms/plugins/mldht/utils/NIOConnectionManager.java Added Paths: ----------- mldht/trunk/lbms/plugins/mldht/utils/Selectable.java Removed Paths: ------------- mldht/trunk/lbms/plugins/mldht/indexer/Selectable.java Modified: mldht/trunk/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/trunk/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java 2011-06-11 10:59:42 UTC (rev 2048) +++ mldht/trunk/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -12,6 +12,7 @@ import lbms.plugins.mldht.kad.DHT.LogLevel; import lbms.plugins.mldht.kad.utils.AddressUtils; import lbms.plugins.mldht.utils.NIOConnectionManager; +import lbms.plugins.mldht.utils.Selectable; public class MetaDataConnectionServer implements Selectable { @@ -42,7 +43,7 @@ return channel; } - public void registrationEvent(NIOConnectionManager manager) throws IOException { + public void registrationEvent(NIOConnectionManager manager, SelectionKey key) throws IOException { conHandler = manager; channel.socket().bind(new InetSocketAddress(addr, port), 100); conHandler.setSelection(this, SelectionKey.OP_ACCEPT, true); Modified: mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java 2011-06-11 10:59:42 UTC (rev 2048) +++ mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -36,6 +36,7 @@ import lbms.plugins.mldht.kad.DHT.LogLevel; import lbms.plugins.mldht.kad.utils.ThreadLocalUtils; import lbms.plugins.mldht.utils.NIOConnectionManager; +import lbms.plugins.mldht.utils.Selectable; public class PullMetaDataConnection implements Selectable { @@ -191,7 +192,7 @@ return channel; } - public void registrationEvent(NIOConnectionManager manager) throws IOException { + public void registrationEvent(NIOConnectionManager manager, SelectionKey key) throws IOException { connManager = manager; lastReceivedTime = System.currentTimeMillis(); Deleted: mldht/trunk/lbms/plugins/mldht/indexer/Selectable.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/indexer/Selectable.java 2011-06-11 10:59:42 UTC (rev 2048) +++ mldht/trunk/lbms/plugins/mldht/indexer/Selectable.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -1,33 +0,0 @@ -/* - * This file is part of mlDHT. - * - * mlDHT is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 2 of the License, or - * (at your option) any later version. - * - * mlDHT is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with mlDHT. If not, see <http://www.gnu.org/licenses/>. - */ -package lbms.plugins.mldht.indexer; - -import java.io.IOException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; - -import lbms.plugins.mldht.utils.NIOConnectionManager; - -public interface Selectable { - public SelectableChannel getChannel(); - - public void registrationEvent(NIOConnectionManager manager) throws IOException; - - public void selectionEvent(SelectionKey key) throws IOException; - - public void doStateChecks(long now) throws IOException; -} \ No newline at end of file Modified: mldht/trunk/lbms/plugins/mldht/kad/RPCServer.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/RPCServer.java 2011-06-11 10:59:42 UTC (rev 2048) +++ mldht/trunk/lbms/plugins/mldht/kad/RPCServer.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -26,10 +26,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import lbms.plugins.mldht.indexer.Selectable; import lbms.plugins.mldht.kad.DHT.LogLevel; import lbms.plugins.mldht.kad.messages.*; import lbms.plugins.mldht.kad.messages.ErrorMessage.ErrorCode; @@ -39,6 +39,7 @@ import lbms.plugins.mldht.kad.utils.ResponseTimeoutFilter; import lbms.plugins.mldht.kad.utils.ThreadLocalUtils; import lbms.plugins.mldht.utils.NIOConnectionManager; +import lbms.plugins.mldht.utils.Selectable; /** * @author The_8472, Damokles @@ -373,7 +374,7 @@ private void fillPipe(EnqueuedSend es) { pipeline.add(es); - sel.updateSelection(); + sel.writeEvent(false); } @@ -448,6 +449,7 @@ private class SocketHandler implements Selectable { DatagramChannel channel; + SelectionKey key; { @@ -475,18 +477,18 @@ if(key.isValid() && key.isReadable()) readEvent(); if(key.isValid() && key.isWritable()) - writeEvent(channel); + writeEvent(true); } private void readEvent() throws IOException { - final ConcurrentLinkedQueue<EnqueedRead> toProcess = new ConcurrentLinkedQueue<RPCServer.EnqueedRead>(); + final ConcurrentLinkedQueue<EnqueuedRead> toProcess = new ConcurrentLinkedQueue<RPCServer.EnqueuedRead>(); final AtomicBoolean processorRunning = new AtomicBoolean(false); Runnable readProcessor = new Runnable() { public void run() { - EnqueedRead r; + EnqueuedRead r; while((r = toProcess.poll()) != null) { try { @@ -501,7 +503,7 @@ while(true) { - EnqueedRead read = new EnqueedRead(); + EnqueuedRead read = new EnqueuedRead(); read.buf = ByteBuffer.allocate(DHTConstants.RECEIVE_BUFFER_SIZE); read.soa = channel.receive(read.buf); if(read.soa == null) @@ -516,53 +518,90 @@ numReceived++; stats.addReceivedBytes(read.buf.limit() + dh_table.getType().HEADER_LENGTH); } + + // make sure to cleanup everything + if(toProcess.peek() != null) + DHT.getScheduler().execute(readProcessor); } - private void writeEvent(DatagramChannel chan) + private static final int WRITE_STATE_IDLE = 0; + private static final int WRITE_STATE_WRITING = 2; + private static final int WRITE_STATE_AWAITING_NIO_NOTIFICATION = 3; + + private AtomicInteger writeState = new AtomicInteger(); + + public void writeEvent(boolean onSelectorThread) { - EnqueuedSend es; - while(true) + // simply assume nobody else is writing when we're on a non-selector thread and attempt to do it + int currentState = WRITE_STATE_IDLE; + + if(onSelectorThread) + { // get the real state on the selector thread and act accordingly + currentState = writeState.get(); + // someone else is doing the work for us, yay + if(currentState == WRITE_STATE_WRITING) + return; + } + + if(writeState.compareAndSet(currentState, WRITE_STATE_WRITING)) { - es = pipeline.poll(); - if(es == null) - break; - try + // we are now the exclusive writer for this socket + + while(true) { - ByteBuffer buf = es.getBuffer(); - - if(chan.send(buf, es.toSend.getDestination()) == 0) + EnqueuedSend es = pipeline.poll(); + if(es == null) + break; + try { - pipeline.add(es); + ByteBuffer buf = es.getBuffer(); + + if(channel.send(buf, es.toSend.getDestination()) == 0) + { + pipeline.add(es); + // socket is full + updateSelection(); + + writeState.set(WRITE_STATE_AWAITING_NIO_NOTIFICATION); + return; + } + + if(es.associatedCall != null) + es.associatedCall.sent(); + + stats.addSentMessageToCount(es.toSend); + stats.addSentBytes(buf.limit() + dh_table.getType().HEADER_LENGTH); + if(DHT.isLogLevelEnabled(LogLevel.Debug)) + DHT.logDebug("RPC send Message: [" + es.toSend.getDestination().getAddress().getHostAddress() + "] "+ es.toSend.toString()); + } catch (IOException e) + { + DHT.log(new IOException(addr+" -> "+es.toSend.getDestination(), e), LogLevel.Error); + if(es.associatedCall != null) + { // need to notify listeners + es.associatedCall.sendFailed(); + } break; - } + } - if(es.associatedCall != null) - es.associatedCall.sent(); - - stats.addSentMessageToCount(es.toSend); - stats.addSentBytes(buf.limit() + dh_table.getType().HEADER_LENGTH); - if(DHT.isLogLevelEnabled(LogLevel.Debug)) - DHT.logDebug("RPC send Message: [" + es.toSend.getDestination().getAddress().getHostAddress() + "] "+ es.toSend.toString()); - } catch (IOException e) - { - DHT.log(new IOException(addr+" -> "+es.toSend.getDestination(), e), LogLevel.Error); - if(es.associatedCall != null) - { // need to notify listeners - es.associatedCall.sendFailed(); - } - //pipeline.add(es); - break; + numSent++; } + // release claim on the socket + writeState.set(WRITE_STATE_IDLE); - numSent++; - } + // check if we might have to pick it up again due to races + if(pipeline.peek() != null) + writeEvent(onSelectorThread); + + } + } @Override - public void registrationEvent(NIOConnectionManager manager) throws IOException { + public void registrationEvent(NIOConnectionManager manager, SelectionKey key) throws IOException { connectionManager = manager; + this.key = key; updateSelection(); } @@ -591,7 +630,7 @@ int newSel = SelectionKey.OP_READ; if(pipeline.peek() != null) newSel |= SelectionKey.OP_WRITE; - connectionManager.asyncSetSelection(this, newSel); + connectionManager.asyncSetSelection(key, newSel); } } @@ -613,7 +652,7 @@ } } - private static class EnqueedRead { + private static class EnqueuedRead { SocketAddress soa; ByteBuffer buf; } Modified: mldht/trunk/lbms/plugins/mldht/utils/NIOConnectionManager.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/utils/NIOConnectionManager.java 2011-06-11 10:59:42 UTC (rev 2048) +++ mldht/trunk/lbms/plugins/mldht/utils/NIOConnectionManager.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -28,7 +28,6 @@ import sun.net.www.content.audio.wav; -import lbms.plugins.mldht.indexer.Selectable; import lbms.plugins.mldht.kad.DHT; import lbms.plugins.mldht.kad.DHT.LogLevel; @@ -86,8 +85,7 @@ while((toRegister = registrations.poll()) != null) { connections.add(toRegister); - toRegister.getChannel().register(selector, 0,toRegister); - toRegister.registrationEvent(NIOConnectionManager.this); + toRegister.registrationEvent(NIOConnectionManager.this,toRegister.getChannel().register(selector, 0,toRegister)); } } catch (Exception e) @@ -147,9 +145,9 @@ selector.wakeup(); } - public void asyncSetSelection(Selectable connection, int mask) + public void asyncSetSelection(SelectionKey key, int mask) { - connection.getChannel().keyFor(selector).interestOps(mask); + key.interestOps(mask); if(isSelecting) { isSelecting = false; @@ -157,6 +155,10 @@ } } + + public Selector getSelector() { + return selector; + } /** * note: this method is not thread-safe Copied: mldht/trunk/lbms/plugins/mldht/utils/Selectable.java (from rev 2039, mldht/trunk/lbms/plugins/mldht/indexer/Selectable.java) =================================================================== --- mldht/trunk/lbms/plugins/mldht/utils/Selectable.java (rev 0) +++ mldht/trunk/lbms/plugins/mldht/utils/Selectable.java 2011-06-11 22:04:23 UTC (rev 2049) @@ -0,0 +1,32 @@ +/* + * This file is part of mlDHT. + * + * mlDHT is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * mlDHT is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with mlDHT. If not, see <http://www.gnu.org/licenses/>. + */ +package lbms.plugins.mldht.utils; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; + + +public interface Selectable { + public SelectableChannel getChannel(); + + public void registrationEvent(NIOConnectionManager manager, SelectionKey key) throws IOException; + + public void selectionEvent(SelectionKey key) throws IOException; + + public void doStateChecks(long now) throws IOException; +} \ No newline at end of file Property changes on: mldht/trunk/lbms/plugins/mldht/utils/Selectable.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |