From: <th...@us...> - 2010-09-09 15:09:54
|
Revision: 1941 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1941&view=rev Author: the8472 Date: 2010-09-09 15:09:45 +0000 (Thu, 09 Sep 2010) Log Message: ----------- limit incoming connections Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java 2010-09-08 16:16:20 UTC (rev 1940) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataConnectionServer.java 2010-09-09 15:09:45 UTC (rev 1941) @@ -18,6 +18,8 @@ public void acceptedConnection(SocketChannel chan); + public boolean canAccept(); + } public static final int DEFAULT_PORT = 49002; Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-09-08 16:16:20 UTC (rev 1940) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-09-09 15:09:45 UTC (rev 1941) @@ -66,7 +66,7 @@ ArrayList<Key> pivotPoints = new ArrayList<Key>(); AtomicInteger activeLookups = new AtomicInteger(); - AtomicInteger activeConnections = new AtomicInteger(); + AtomicInteger activeOutgoingConnections = new AtomicInteger(); ConcurrentLinkedQueue<FetchTask> fetchTasks = new ConcurrentLinkedQueue<FetchTask>(); Queue<SessionRunnable> toFinish = new ConcurrentLinkedQueue<MetaDataGatherer.SessionRunnable>(); @@ -76,6 +76,7 @@ private static final boolean LOGGING = false; public RotatingBloomFilter failedHashes = new RotatingBloomFilter(1000000, 8*1024*1024); + AtomicInteger activeIncomingConnections = new AtomicInteger(); MetaDataConnectionServer v4srv; MetaDataConnectionServer v6srv; @@ -181,10 +182,16 @@ private void initListeningService() { IncomingConnectionHandler handler = new IncomingConnectionHandler() { + public boolean canAccept() { + return activeIncomingConnections.get() < numVirtualNodes * MAX_CONCURRENT_METADATA_CONNECTIONS_PER_NODE; + } + public void acceptedConnection(SocketChannel chan) { final PullMetaDataConnection conn = new PullMetaDataConnection(chan); conn.terminationHandler = new Runnable() { public void run() { + activeIncomingConnections.decrementAndGet(); + if(conn.isState(PullMetaDataConnection.STATE_METADATA_VERIFIED)) { Session s = HibernateUtil.getSessionFactory().openSession(); @@ -208,6 +215,7 @@ } }; + activeIncomingConnections.incrementAndGet(); conn.register(); } }; @@ -240,7 +248,7 @@ updatePivots(); - int currentPool = activeConnections.get() + fetchTasks.size(); + int currentPool = activeOutgoingConnections.get() + fetchTasks.size(); int targetPool = numVirtualNodes * MAX_CONCURRENT_METADATA_CONNECTIONS_PER_NODE; int currentLookups = activeLookups.get(); int maxLookups = numVirtualNodes * LOOKUPS_PER_VIRTUAL_NODE; @@ -413,7 +421,7 @@ private void digestTaskQueue() { - while(activeConnections.get() < numVirtualNodes * MAX_CONCURRENT_METADATA_CONNECTIONS_PER_NODE) + while(activeOutgoingConnections.get() < numVirtualNodes * MAX_CONCURRENT_METADATA_CONNECTIONS_PER_NODE) { FetchTask task = fetchTasks.poll(); if(task == null) @@ -491,12 +499,12 @@ } } - activeConnections.decrementAndGet(); + activeOutgoingConnections.decrementAndGet(); } }; // connect after registering the handler in case we get an immediate terminate event - activeConnections.incrementAndGet(); + activeOutgoingConnections.incrementAndGet(); conn.register(); log("starting metadata connection for "+task.hash); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |