From: <th...@us...> - 2010-04-23 05:00:01
|
Revision: 1827 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1827&view=rev Author: the8472 Date: 2010-04-23 04:59:54 +0000 (Fri, 23 Apr 2010) Log Message: ----------- concurrency fixes Modified Paths: -------------- mldht/trunk/DHTIndexer.jar mldht/trunk/lbms/plugins/mldht/kad/AnnounceNodeCache.java mldht/trunk/lbms/plugins/mldht/kad/KBucket.java mldht/trunk/lbms/plugins/mldht/kad/Node.java Modified: mldht/trunk/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/trunk/lbms/plugins/mldht/kad/AnnounceNodeCache.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/AnnounceNodeCache.java 2010-04-22 17:52:02 UTC (rev 1826) +++ mldht/trunk/lbms/plugins/mldht/kad/AnnounceNodeCache.java 2010-04-23 04:59:54 UTC (rev 1827) @@ -3,24 +3,45 @@ import java.util.*; public class AnnounceNodeCache { - - private class CacheEntry { + /* + private static final int HIGH_WATERMARK = 1000; + private static final int LOW_WATERMARK = 500; + */ + private class CacheEntry /*implements Comparable<CacheEntry>*/ { public CacheEntry(Collection<KBucketEntry> entries) { this.entries = entries; } long now = System.currentTimeMillis(); Collection<KBucketEntry> entries; + + + /* + // youngest entry = highest timestamp = last + public int compareTo(CacheEntry o) { + return Long.signum(now - o.now); + }*/ + + } NavigableMap<Key, CacheEntry> cache = new TreeMap<Key, CacheEntry>(); - public void add(Key target, Collection<KBucketEntry> nodes) + public synchronized void add(Key target, Collection<KBucketEntry> nodes) { + if(nodes.isEmpty()) + return; + /* + if(cache.size() > HIGH_WATERMARK) + { + List<CacheEntry> entries = new ArrayList<CacheEntry>(cache.values()); + Collections.sort(entries); + cache.values().removeAll(entries.subList(0, entries.size()-LOW_WATERMARK)); + }*/ cache.put(target, new CacheEntry(nodes)); } - public Collection<KBucketEntry> get(Key target) + public synchronized Collection<KBucketEntry> get(Key target) { Map.Entry<Key, CacheEntry> entry = null; Map.Entry<Key, CacheEntry> entry1 = cache.ceilingEntry(target); @@ -38,7 +59,7 @@ return entry == null ? Collections.EMPTY_LIST : entry.getValue().entries; } - public void cleanup(long now) + public synchronized void cleanup(long now) { for(Iterator<Map.Entry<Key, CacheEntry>> it = cache.entrySet().iterator();it.hasNext();) if(now - it.next().getValue().now > DHTConstants.ANNOUNCE_CACHE_MAX_AGE) Modified: mldht/trunk/lbms/plugins/mldht/kad/KBucket.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/KBucket.java 2010-04-22 17:52:02 UTC (rev 1826) +++ mldht/trunk/lbms/plugins/mldht/kad/KBucket.java 2010-04-23 04:59:54 UTC (rev 1827) @@ -399,13 +399,14 @@ } - public boolean checkForIDChangeAndNotifyOfResponse(MessageBase msg) + public boolean checkForIDChange(MessageBase msg) { synchronized (entries) { // check if node changed its ID - for (KBucketEntry entry : entries) + for (int i=0, n = entries.size();i<n;i++) { + KBucketEntry entry = entries.get(i); // node ID change detected, reassign node to the appropriate bucket if (entry.getAddress().equals(msg.getOrigin()) && !entry.getID().equals(msg.getID())) { @@ -425,6 +426,21 @@ } return false; } + + public void notifyOfResponse(MessageBase msg) + { + for (int i=0, n = entries.size();i<n;i++) + { + KBucketEntry entry = entries.get(i); + + // update last responded. insert will be invoked soon, thus we don't have to do the move-to-end stuff + if(msg.getType() == Type.RSP_MSG && entry.getID().equals(msg.getID())) + { + entry.signalResponse(); + return; + } + } + } /** Modified: mldht/trunk/lbms/plugins/mldht/kad/Node.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/Node.java 2010-04-22 17:52:02 UTC (rev 1826) +++ mldht/trunk/lbms/plugins/mldht/kad/Node.java 2010-04-23 04:59:54 UTC (rev 1827) @@ -7,6 +7,7 @@ import lbms.plugins.mldht.DHTConfiguration; import lbms.plugins.mldht.kad.DHT.LogLevel; import lbms.plugins.mldht.kad.messages.MessageBase; +import lbms.plugins.mldht.kad.messages.MessageBase.Type; import lbms.plugins.mldht.kad.tasks.NodeLookup; import lbms.plugins.mldht.kad.tasks.PingRefreshTask; import lbms.plugins.mldht.kad.tasks.Task; @@ -78,13 +79,26 @@ boolean nodeIDchanged = false; + // to avoid scanning all buckets on each incoming packet we use a cache of Addresses we have in our buckets + // this is inaccurate, but good enough to catch most node ID changes RoutingTableEntry cachedEntry = knownNodes.get(newEntry.getAddress()); if(cachedEntry != null) - nodeIDchanged = cachedEntry.bucket.checkForIDChangeAndNotifyOfResponse(msg); + nodeIDchanged = cachedEntry.bucket.checkForIDChange(msg); if(!nodeIDchanged) + { + if(msg.getType() == Type.RSP_MSG) + { + RoutingTableEntry entry = findBucketForId(msg.getID()); + entry.bucket.notifyOfResponse(msg); + } + insertEntry(newEntry,false); + } + + + num_receives++; updateEntryCount(); @@ -312,8 +326,8 @@ } - Map<InetSocketAddress, RoutingTableEntry> newKnownMap = new HashMap<InetSocketAddress, RoutingTableEntry>(num_entries); + for (RoutingTableEntry e : routingTable) { KBucket b = e.bucket; @@ -330,7 +344,7 @@ b.removeEntry(entry, true); allBad &= entry.isBad(); - newKnownMap.put(entry.getAddress(), e); + } // clean out buckets full of bad nodes. merge operations will do the rest @@ -363,8 +377,21 @@ } + rebuildAddressCache(); + } + + private void rebuildAddressCache() { + Map<InetSocketAddress, RoutingTableEntry> newKnownMap = new HashMap<InetSocketAddress, RoutingTableEntry>(num_entries); + List<RoutingTableEntry> table = routingTable; + for(int i=0,n=table.size();i<n;i++) + { + RoutingTableEntry entry = table.get(i); + List<KBucketEntry> entries = entry.bucket.getEntries(); + for(int j=0,m=entries.size();j<m;j++) + newKnownMap.put(entries.get(j).getAddress(), entry); + } + knownNodes = newKnownMap; - } /** @@ -499,6 +526,7 @@ } updateEntryCount(); + rebuildAddressCache(); if (entriesLoaded > 0) { runDeferred = true; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |