From: <th...@us...> - 2011-03-20 22:48:59
|
Revision: 2010 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=2010&view=rev Author: the8472 Date: 2011-03-20 22:48:53 +0000 (Sun, 20 Mar 2011) Log Message: ----------- improved replacement bucket handling for high-throughput scenarios Modified Paths: -------------- mldht/trunk/DHTIndexer.jar mldht/trunk/lbms/plugins/mldht/kad/DHT.java mldht/trunk/lbms/plugins/mldht/kad/KBucket.java mldht/trunk/lbms/plugins/mldht/kad/KBucketEntry.java mldht/trunk/lbms/plugins/mldht/kad/Node.java mldht/trunk/lbms/plugins/mldht/kad/tasks/Task.java Modified: mldht/trunk/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/trunk/lbms/plugins/mldht/kad/DHT.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/DHT.java 2011-03-18 22:45:27 UTC (rev 2009) +++ mldht/trunk/lbms/plugins/mldht/kad/DHT.java 2011-03-20 22:48:53 UTC (rev 2010) @@ -379,7 +379,10 @@ @Override public PingRefreshTask refreshBuckets (List<RoutingTableEntry> buckets, boolean cleanOnTimeout) { - PingRefreshTask prt = new PingRefreshTask(serverManager.getRandomActiveServer(true), node, buckets,cleanOnTimeout); + RPCServer srv = serverManager.getRandomActiveServer(true); + if(srv == null) + return null; + PingRefreshTask prt = new PingRefreshTask(srv, node, buckets,cleanOnTimeout); tman.addTask(prt, true); return prt; Modified: mldht/trunk/lbms/plugins/mldht/kad/KBucket.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/KBucket.java 2011-03-18 22:45:27 UTC (rev 2009) +++ mldht/trunk/lbms/plugins/mldht/kad/KBucket.java 2011-03-20 22:48:53 UTC (rev 2010) @@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; import lbms.plugins.mldht.kad.DHT.LogLevel; import lbms.plugins.mldht.kad.messages.MessageBase; @@ -47,9 +49,10 @@ * using copy-on-write semantics for this list, referencing it is safe if you make local copy */ private volatile List<KBucketEntry> entries; - // replacements are synchronized on entries, not on replacementBucket! using a liked list since we use it as queue, not as stack - private ConcurrentLinkedQueue<KBucketEntry> replacementBucket; + private AtomicInteger currentReplacementPointer; + private AtomicReferenceArray<KBucketEntry> replacementBucket; + private Node node; private Map<Key,KBucketEntry> pendingPings; private long last_modified; @@ -57,7 +60,8 @@ public KBucket () { entries = new ArrayList<KBucketEntry>(); // using arraylist here since reading/iterating is far more common than writing. - replacementBucket = new ConcurrentLinkedQueue<KBucketEntry>(); + currentReplacementPointer = new AtomicInteger(0); + replacementBucket = new AtomicReferenceArray<KBucketEntry>(DHTConstants.MAX_ENTRIES_PER_BUCKET); // .size() is called on every insert, it's rarely used and is limited to 2... so use a cow set for high throughput pendingPings = new ConcurrentSkipListMap<Key, KBucketEntry>(); } @@ -196,7 +200,11 @@ } public int getNumReplacements() { - return replacementBucket.size(); + int c = 0; + for(int i=0;i<replacementBucket.length();i++) + if(replacementBucket.get(i) != null) + c++; + return c; } /** @@ -207,7 +215,15 @@ } public List<KBucketEntry> getReplacementEntries() { - return new ArrayList<KBucketEntry>(replacementBucket); + List<KBucketEntry> repEntries = new ArrayList<KBucketEntry>(replacementBucket.length()); + int current = currentReplacementPointer.get(); + for(int i=1;i<=replacementBucket.length();i++) + { + KBucketEntry e = replacementBucket.get((current + i) % replacementBucket.length()); + if(e != null) + repEntries.add(e); + } + return repEntries; } /** @@ -367,17 +383,16 @@ private KBucketEntry getYoungestReplacementEntry() { - // fetches the last element from the queue. since it's not double-ended we have to iterate through it - for(Iterator<KBucketEntry> it = replacementBucket.iterator();it.hasNext();) - { - KBucketEntry e = it.next(); - if(!it.hasNext()) + while(true) { + int current = currentReplacementPointer.get(); + int newValue = current--; + if(newValue < 0) + newValue = replacementBucket.length()-1; + if(currentReplacementPointer.compareAndSet(current, newValue)) { - it.remove(); - return e; + return replacementBucket.getAndSet(current, null); } } - return null; } private void insertInReplacementBucket(KBucketEntry entry) @@ -385,27 +400,30 @@ if(entry == null) return; - // start counting from 1 since we'll add 1 entry - int size = 1; - - //if it is already inserted remove it and add it to the end - for(Iterator<KBucketEntry> it = replacementBucket.iterator();it.hasNext();) + outer: + while(true) { - KBucketEntry oldEntry = it.next(); - if(oldEntry.equals(entry)) + int current = currentReplacementPointer.get(); + int next = (current+1) % replacementBucket.length(); + + KBucketEntry nextEntry = replacementBucket.get(next); + if(nextEntry == null || entry.getLastSeen() - nextEntry.getLastSeen() > 1000) { - it.remove(); - oldEntry.mergeTimestamps(entry); - replacementBucket.add(oldEntry); - return; + for(int i=0;i<replacementBucket.length();i++) + { + if(entry.equals(replacementBucket.get(i))) + break outer; + } + if(currentReplacementPointer.compareAndSet(current, next)) + { + replacementBucket.set(next, entry); + break; + } + } else + { + break; } - size++; } - - replacementBucket.add(entry); - - while(size-- > DHTConstants.MAX_ENTRIES_PER_BUCKET) - replacementBucket.poll(); //remove the least recently seen one } /** @@ -515,14 +533,19 @@ entries.addAll((Collection<KBucketEntry>)obj); obj = serialized.get("replacementBucket"); if(obj instanceof Collection<?>) - replacementBucket.addAll((Collection<KBucketEntry>)obj); + { + // we are possibly violating the last-seen ordering of the replacement bucket here + // but since we're probably deserializing old data that's hardly relevant + for(KBucketEntry e : (Collection<KBucketEntry>)obj) + insertInReplacementBucket(e); + } + obj = serialized.get("lastModifiedTime"); if(obj instanceof Long) last_modified = (Long)obj; obj = serialized.get("prefix"); entries.removeAll(Collections.singleton(null)); - replacementBucket.removeAll(Collections.singleton(null)); Collections.sort(entries, KBucketEntry.AGE_ORDER); //Collections.sort(replacementBucket,KBucketEntry.LAST_SEEN_ORDER); } @@ -531,7 +554,7 @@ Map<String,Object> serialized = new HashMap<String, Object>(); // put entries as any type of collection, will convert them on deserialisation serialized.put("mainBucket", entries); - serialized.put("replacementBucket", replacementBucket); + serialized.put("replacementBucket", getReplacementEntries()); serialized.put("lastModifiedTime", last_modified); out.writeObject(serialized); Modified: mldht/trunk/lbms/plugins/mldht/kad/KBucketEntry.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/KBucketEntry.java 2011-03-18 22:45:27 UTC (rev 2009) +++ mldht/trunk/lbms/plugins/mldht/kad/KBucketEntry.java 2011-03-20 22:48:53 UTC (rev 2010) @@ -184,6 +184,8 @@ * violating the equals contract (specifically: the transitivity requirement) here, use with care */ public boolean equals (KBucketEntry other) { + if(other == null) + return false; return nodeID.equals(other.nodeID) || addr.getAddress().equals(other.addr.getAddress()); } Modified: mldht/trunk/lbms/plugins/mldht/kad/Node.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/Node.java 2011-03-18 22:45:27 UTC (rev 2009) +++ mldht/trunk/lbms/plugins/mldht/kad/Node.java 2011-03-20 22:48:53 UTC (rev 2010) @@ -125,7 +125,7 @@ public void insertEntry (KBucketEntry entry, boolean internalInsert) { - if(usedIDs.containsKey(entry.getID()) || AddressUtils.isBogon(entry.getAddress()) || !dht.getType().PREFERRED_ADDRESS_TYPE.isInstance(entry.getAddress().getAddress())) + if(entry == null || usedIDs.containsKey(entry.getID()) || AddressUtils.isBogon(entry.getAddress()) || !dht.getType().PREFERRED_ADDRESS_TYPE.isInstance(entry.getAddress().getAddress())) return; Key nodeID = entry.getID(); Modified: mldht/trunk/lbms/plugins/mldht/kad/tasks/Task.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/kad/tasks/Task.java 2011-03-18 22:45:27 UTC (rev 2009) +++ mldht/trunk/lbms/plugins/mldht/kad/tasks/Task.java 2011-03-20 22:48:53 UTC (rev 2010) @@ -66,6 +66,8 @@ * @param node The node */ Task (Key target, RPCServer rpc, Node node) { + if(rpc == null) + throw new IllegalArgumentException("RPC must not be null"); this.targetKey = target; this.rpc = rpc; this.node = node; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |