From: <th...@us...> - 2010-07-26 22:59:17
|
Revision: 1911 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1911&view=rev Author: the8472 Date: 2010-07-26 22:59:07 +0000 (Mon, 26 Jul 2010) Log Message: ----------- -oops, cache pruning didn't work properly -fast, optional termination strategy for get_peers lookups for the indexer -better keyspace slicing Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/azureus/Tracker.java mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java mldht/branches/indexer/lbms/plugins/mldht/kad/AnnounceNodeCache.java mldht/branches/indexer/lbms/plugins/mldht/kad/DHT.java mldht/branches/indexer/lbms/plugins/mldht/kad/DHTBase.java mldht/branches/indexer/lbms/plugins/mldht/kad/tasks/PeerLookupTask.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/azureus/Tracker.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/azureus/Tracker.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/azureus/Tracker.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -166,8 +166,8 @@ for(DHTtype type : DHTtype.values()) { - PeerLookupTask lookupTask = plugin.getDHT(type).lookupPeers( - dl.getTorrent().getHash()); + DHT dht = plugin.getDHT(type); + PeerLookupTask lookupTask = dht.createPeerLookup(dl.getTorrent().getHash()); if (lookupTask != null) { pendingCount.incrementAndGet(); lookupTask.setScrapeHandler(handler); @@ -175,6 +175,7 @@ lookupTask.addListener(this); lookupTask.setInfo(dl.getName()); lookupTask.setNoSeeds(dl.isComplete(true)); + dht.getTaskManager().addTask(lookupTask); } } Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -54,7 +54,8 @@ public static final int MAX_CONCURRENT_METADATA_CONNECTIONS = 90; public static final int PREFETCH_HIGH_WATERMARK = 30; - public static final int LOOKUPS_PER_DHT = 3; + public static final int LOOKUPS_PER_VIRTUAL_NODE = 3; + public static final int PIVOT_EVERY_N_VIRTUAL_NODES = 3; ArrayList<Key> pivotPoints = new ArrayList<Key>(); AtomicInteger activeLookups = new AtomicInteger(); @@ -87,7 +88,7 @@ } - private int maxConcurrentLookups = 0; + private int numVirtualNodes = 0; public MetaDataGatherer() { @@ -106,7 +107,7 @@ { updatePivots(); - if(fetchTasks.size() < PREFETCH_HIGH_WATERMARK && activeLookups.get() < maxConcurrentLookups) + if(fetchTasks.size() < PREFETCH_HIGH_WATERMARK && activeLookups.get() < numVirtualNodes * LOOKUPS_PER_VIRTUAL_NODE) dhtLookup(); } catch (Exception e) { @@ -130,7 +131,7 @@ dhtServers = Math.min(dhtServers, dhtMax); } - maxConcurrentLookups = dhtServers * LOOKUPS_PER_DHT; + numVirtualNodes = dhtServers; } catch (Exception e) { DHT.log(e, LogLevel.Error); @@ -152,8 +153,7 @@ } private void updatePivots() { - int dhts = maxConcurrentLookups / LOOKUPS_PER_DHT; - int numPivots = Math.max(2, dhts / 3); + int numPivots = Math.max(2, numVirtualNodes / PIVOT_EVERY_N_VIRTUAL_NODES); Key k = Key.createRandomKey(); int i = 0; while(pivotPoints.size() < numPivots) @@ -191,13 +191,8 @@ Transaction tx = session.beginTransaction(); try { - /* - DetachedCriteria minAttempt = DetachedCriteria.forClass(TorrentDBEntry.class,"sub1"); - minAttempt.setProjection(Projections.min("sub1.fetchAttemptCount")); - minAttempt.add(Restrictions.eq("sub1.status", 0)); - */ - - int wants = maxConcurrentLookups - activeLookups.get(); + + int wants = Math.min(numVirtualNodes * LOOKUPS_PER_VIRTUAL_NODE - activeLookups.get(), LOOKUPS_PER_VIRTUAL_NODE * PIVOT_EVERY_N_VIRTUAL_NODES); if(wants < 1) { tx.commit(); @@ -301,12 +296,15 @@ for(DHTtype type : DHTtype.values()) { - PeerLookupTask lookupTask = DHT.getDHT(type).lookupPeers(hash); + DHT dht = DHT.getDHT(type); + PeerLookupTask lookupTask = dht.createPeerLookup(hash); pendingLookups.incrementAndGet(); + lookupTask.setFastLookup(true); lookupTask.setScrapeOnly(true); lookupTask.addListener(lookupListener); lookupTask.setInfo("Grabbing .torrent for "+entry.info_hash); lookupTask.setNoSeeds(false); + dht.getTaskManager().addTask(lookupTask); } } Modified: mldht/branches/indexer/lbms/plugins/mldht/kad/AnnounceNodeCache.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/kad/AnnounceNodeCache.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/kad/AnnounceNodeCache.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -72,9 +72,13 @@ if(targetEntry == null || !targetEntry.getValue().prefix.isPrefixOf(nodeId)) return; - if(targetEntry.getValue().entries.remove(nodeId)) - targetEntry.getValue().numEntries.decrementAndGet(); - + for(Iterator<KBucketEntry> it = targetEntry.getValue().entries.iterator();it.hasNext();) + if(it.next().getID().equals(nodeId)) + { + it.remove(); + targetEntry.getValue().numEntries.decrementAndGet(); + break; + } } } @@ -88,7 +92,7 @@ Entry<Key, CacheBucket> targetEntry = cache.floorEntry(target); if(targetEntry == null || !targetEntry.getValue().prefix.isPrefixOf(target)) - { // split operation ongoing, retry + { // split/merge operation ongoing, retry Thread.yield(); continue; } Modified: mldht/branches/indexer/lbms/plugins/mldht/kad/DHT.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/kad/DHT.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/kad/DHT.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -360,14 +360,13 @@ * * @see lbms.plugins.mldht.kad.DHTBase#announce(byte[], int) */ - public PeerLookupTask lookupPeers (byte[] info_hash) { + public PeerLookupTask createPeerLookup (byte[] info_hash) { if (!running) { return null; } Key id = new Key(info_hash); PeerLookupTask lookupTask = new PeerLookupTask(getRandomServer(), node, id); - tman.addTask(lookupTask); return lookupTask; } Modified: mldht/branches/indexer/lbms/plugins/mldht/kad/DHTBase.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/kad/DHTBase.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/kad/DHTBase.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -63,7 +63,7 @@ * @param info_hash The info_hash * @return The task which handles this */ - public PeerLookupTask lookupPeers(byte[] info_hash); + public PeerLookupTask createPeerLookup(byte[] info_hash); /** Modified: mldht/branches/indexer/lbms/plugins/mldht/kad/tasks/PeerLookupTask.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/kad/tasks/PeerLookupTask.java 2010-07-25 03:10:40 UTC (rev 1910) +++ mldht/branches/indexer/lbms/plugins/mldht/kad/tasks/PeerLookupTask.java 2010-07-26 22:59:07 UTC (rev 1911) @@ -37,6 +37,7 @@ private boolean scrapeOnly; private boolean noSeeds; + private boolean fastLookup; // nodes which have answered with tokens private List<KBucketEntryAndToken> announceCanidates; @@ -71,6 +72,12 @@ public void setNoSeeds(boolean avoidSeeds) { noSeeds = avoidSeeds; } + + public void setFastLookup(boolean isFastLookup) { + if(!isQueued()) + throw new IllegalStateException("cannot change lookup mode after startup"); + fastLookup = isFastLookup; + } public void setScrapeOnly(boolean scrapeOnly) { this.scrapeOnly = scrapeOnly; @@ -227,9 +234,11 @@ } } - if (todo.isEmpty() && getNumOutstandingRequests() == 0 && !isFinished()) { + int waitingFor = fastLookup ? getNumOutstandingRequestsExcludingStalled() : getNumOutstandingRequests(); + + if (todo.isEmpty() && waitingFor == 0 && !isFinished()) { done(); - } else if(getNumOutstandingRequests() == 0 && validReponsesSinceLastClosestSetModification >= DHTConstants.MAX_CONCURRENT_REQUESTS) + } else if(waitingFor == 0 && validReponsesSinceLastClosestSetModification >= DHTConstants.MAX_CONCURRENT_REQUESTS) { // found all closest nodes, we're done done(); } @@ -262,6 +271,8 @@ } public List<KBucketEntryAndToken> getAnnounceCanidates() { + if(fastLookup) + throw new IllegalStateException("cannot use fast lookups for announces"); return announceCanidates; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |