From: Paul G. <pa...@us...> - 2007-04-05 01:52:20
|
Update of /cvsroot/azureus/azureus3/com/aelitis/azureus/core/download In directory sc8-pr-cvs11.sourceforge.net:/tmp/cvs-serv28902/com/aelitis/azureus/core/download Modified Files: EnhancedDownloadManager.java Log Message: try again Index: EnhancedDownloadManager.java =================================================================== RCS file: /cvsroot/azureus/azureus3/com/aelitis/azureus/core/download/EnhancedDownloadManager.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -d -r1.8 -r1.9 --- EnhancedDownloadManager.java 5 Apr 2007 01:06:49 -0000 1.8 +++ EnhancedDownloadManager.java 5 Apr 2007 01:52:18 -0000 1.9 @@ -66,6 +66,7 @@ public static final int DISCONNECT_CHECK_PERIOD = 10*1000; public static final int DISCONNECT_CHECK_TICKS = DISCONNECT_CHECK_PERIOD/DownloadManagerEnhancer.TICK_PERIOD; + private static final String PM_SEED_TIME_KEY = "EnhancedDownloadManager:seedtime"; private static final String PEER_CACHE_KEY = "EnhancedDownloadManager:cachepeer"; private DownloadManagerEnhancer enhancer; @@ -343,65 +344,21 @@ return; } - PEPeerManager pm = download_manager.getPeerManager(); + int state = download_manager.getState(); - if ( pm == null ){ + if ( state != DownloadManager.STATE_SEEDING && state != DownloadManager.STATE_DOWNLOADING ){ return; } - long now = SystemTime.getCurrentTime(); - - int state = download_manager.getState(); + PEPeerManager pm = download_manager.getPeerManager(); - if ( state == DownloadManager.STATE_SEEDING ){ - - // seeding - timeout idle cl peers - - if ( tick_count % DISCONNECT_CHECK_TICKS == 0 ){ - - List peers_to_kick = new ArrayList(); - - synchronized( this ){ - - if ( cache_peers != null ){ - - Iterator it = cache_peers.iterator(); - - while( it.hasNext()){ - - PEPeer peer = (PEPeer)it.next(); - - if ( pm.getStats().getTimeSinceLastDataSentInSeconds() >= IDLE_SEED_DISCONNECT_PERIOD / 1000 ){ - - CachePeer cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY ); - - peers_to_kick.add( peer ); - - if ( disconnected_cache_peers == null ){ - - disconnected_cache_peers = new ArrayList(); - } - - disconnected_cache_peers.add( cache_peer ); - } - } - } - } - - for (int i=0;i<peers_to_kick.size();i++){ - - pm.removePeer((PEPeer)peers_to_kick.get(i), "Cache peer disconnect-seed-on-idle" ); - } - } + if ( pm == null ){ return; } - - if ( state != DownloadManager.STATE_DOWNLOADING ){ - return; - } + long now = SystemTime.getCurrentTime(); long target_speed = getTargetSpeed(); @@ -413,6 +370,24 @@ long time_downloading = getTimeRunning(); + int secs_since_last_up = pm.getStats().getTimeSinceLastDataSentInSeconds(); + + // deal with -1 -> infinite + + if ( secs_since_last_up == -1 ){ + + Long seed_time = (Long)pm.getData( PM_SEED_TIME_KEY ); + + if ( seed_time == null ){ + + seed_time = new Long( now ); + + pm.setData( PM_SEED_TIME_KEY, seed_time ); + } + + secs_since_last_up = (int)(( now - seed_time.longValue()) / 1000); + } + List peers_to_kick = new ArrayList(); synchronized( this ){ @@ -443,42 +418,56 @@ if ( cache_peer.getType() == CachePeer.PT_CACHE_LOGIC ){ - // cache logic rely on timely have messages to control both - // piece allocation and client-speed - - peer.setHaveAggregationEnabled( false ); - - if ( target_speed <= 0 ){ - - setPeerSpeed( peer, -1, now ); - - peers_to_kick.add( peer ); + if ( state == DownloadManager.STATE_SEEDING ){ - if ( disconnected_cache_peers == null ){ + if ( secs_since_last_up >= IDLE_SEED_DISCONNECT_PERIOD / 1000 ){ + + peers_to_kick.add( peer ); - disconnected_cache_peers = new ArrayList(); + addToDisconnectedCachePeers( cache_peer ); + }else{ + + if ( cache_peers == null ){ + + cache_peers = new LinkedList(); + } + + cache_peers.add( peer ); } - - disconnected_cache_peers.add( cache_peer ); - }else{ - long current_speed = download_speed_average.getAverage(); + // cache logic rely on timely have messages to control both + // piece allocation and client-speed - // if we are already exceeding required speed, block - // the cache peer download + peer.setHaveAggregationEnabled( false ); - if ( current_speed + TARGET_SPEED_EXCESS_MARGIN > target_speed ){ - - setPeerSpeed( peer, -1, now ); - } + if ( target_speed <= 0 ){ - if ( cache_peers == null ){ + setPeerSpeed( peer, -1, now ); - cache_peers = new LinkedList(); + peers_to_kick.add( peer ); + + addToDisconnectedCachePeers( cache_peer ); + + }else{ + + long current_speed = download_speed_average.getAverage(); + + // if we are already exceeding required speed, block + // the cache peer download + + if ( current_speed + TARGET_SPEED_EXCESS_MARGIN > target_speed ){ + + setPeerSpeed( peer, -1, now ); + } + + if ( cache_peers == null ){ + + cache_peers = new LinkedList(); + } + + cache_peers.add( peer ); } - - cache_peers.add( peer ); } } }catch( Throwable e ){ @@ -505,208 +494,211 @@ pm.removePeer((PEPeer)peers_to_kick.get(i), "Cache peer not required" ); } - - if ( time_downloading > SPEED_CONTROL_INITIAL_DELAY ){ - - long current_average = download_speed_average.getAverage(); - - if ( current_average < target_speed ){ - - long current_speed = getCurrentSpeed(); - // increase cache peer contribution - // due to latencies we need to give speed increases a time to take - // effect to see if the limits can be reached - - long difference = target_speed - current_speed; + if ( state == DownloadManager.STATE_DOWNLOADING ){ + + if ( time_downloading > SPEED_CONTROL_INITIAL_DELAY ){ - if ( last_speed_increase > now || now - last_speed_increase > SPEED_INCREASE_GRACE_PERIOD ){ + long current_average = download_speed_average.getAverage(); + + if ( current_average < target_speed ){ + + long current_speed = getCurrentSpeed(); + + // increase cache peer contribution + // due to latencies we need to give speed increases a time to take + // effect to see if the limits can be reached + + long difference = target_speed - current_speed; + + if ( last_speed_increase > now || now - last_speed_increase > SPEED_INCREASE_GRACE_PERIOD ){ + + synchronized( this ){ - synchronized( this ){ - - if ( cache_peers != null ){ - - Iterator it = cache_peers.iterator(); - - while( it.hasNext() && difference > 0 ){ - - PEPeer peer = (PEPeer)it.next(); - - PEPeerStats peer_stats = peer.getStats(); - - long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond(); + if ( cache_peers != null ){ - // try simple approach - find first cache peer that is limited - // to less than the target + Iterator it = cache_peers.iterator(); - if ( peer_limit == 0 ){ + while( it.hasNext() && difference > 0 ){ + + PEPeer peer = (PEPeer)it.next(); + + PEPeerStats peer_stats = peer.getStats(); - }else{ + long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond(); - if ( peer_limit < target_speed ){ - - setPeerSpeed( peer, (int)target_speed, now ); + // try simple approach - find first cache peer that is limited + // to less than the target + + if ( peer_limit == 0 ){ - last_speed_increase = now; + }else{ - difference = 0; + if ( peer_limit < target_speed ){ + + setPeerSpeed( peer, (int)target_speed, now ); + + last_speed_increase = now; + + difference = 0; + } } } } } - } - - if ( difference > 0 && - last_peer_inject > now || now - last_peer_inject > PEER_INJECT_GRACE_PERIOD ){ - - Set connected_peers = new HashSet(); - - List peers_to_try = new ArrayList(); - - if ( cache_peers != null ){ + + if ( difference > 0 && + last_peer_inject > now || now - last_peer_inject > PEER_INJECT_GRACE_PERIOD ){ - Iterator it = cache_peers.iterator(); + Set connected_peers = new HashSet(); - while( it.hasNext() && difference > 0 ){ + List peers_to_try = new ArrayList(); + + if ( cache_peers != null ){ + + Iterator it = cache_peers.iterator(); + + while( it.hasNext() && difference > 0 ){ + + PEPeer peer = (PEPeer)it.next(); - PEPeer peer = (PEPeer)it.next(); - - connected_peers.add( peer.getIp() + ":" + peer.getPort()); + connected_peers.add( peer.getIp() + ":" + peer.getPort()); + } } - } - - // if we explicitly disconnected peers in the past then reuse them first - - if ( disconnected_cache_peers != null ){ - while( disconnected_cache_peers.size() > 0 ){ - - CachePeer cp = (CachePeer)disconnected_cache_peers.remove(0); + // if we explicitly disconnected peers in the past then reuse them first + + if ( disconnected_cache_peers != null ){ - if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){ + while( disconnected_cache_peers.size() > 0 ){ - // check that this peer isn't already available as a lookup result + CachePeer cp = (CachePeer)disconnected_cache_peers.remove(0); - if ( lookup_peers != null ){ + if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){ - for (int i=0;i<lookup_peers.length;i++){ - - CachePeer l_cp = lookup_peers[i]; + // check that this peer isn't already available as a lookup result + + if ( lookup_peers != null ){ - if ( l_cp.sameAs( cp )){ + for (int i=0;i<lookup_peers.length;i++){ - cp = null; + CachePeer l_cp = lookup_peers[i]; - break; + if ( l_cp.sameAs( cp )){ + + cp = null; + + break; + } } } - } - - if ( cp != null ){ - - peers_to_try.add( cp ); - break; + if ( cp != null ){ + + peers_to_try.add( cp ); + + break; + } } } - } - - if ( disconnected_cache_peers.size() == 0 ){ - disconnected_cache_peers = null; - } - } - - if ( peers_to_try.size() == 0 ){ - - // can't do the job with existing cache peers, try to find some more - - if ( lookup_peers == null || - now < last_lookup_time || - now - last_lookup_time > CACHE_REQUERY_MIN_PERIOD ){ - - last_lookup_time = now; - - lookup_peers = CacheDiscovery.lookup( download_manager.getTorrent()); + if ( disconnected_cache_peers.size() == 0 ){ + + disconnected_cache_peers = null; + } } - for (int i=0;i<lookup_peers.length;i++){ + if ( peers_to_try.size() == 0 ){ - CachePeer cp = lookup_peers[i]; + // can't do the job with existing cache peers, try to find some more - if ( cp.getAutoReconnect() && now - cp.getInjectTime(now) > CACHE_RECONNECT_MIN_PERIOD ){ + if ( lookup_peers == null || + now < last_lookup_time || + now - last_lookup_time > CACHE_REQUERY_MIN_PERIOD ){ + + last_lookup_time = now; + + lookup_peers = CacheDiscovery.lookup( download_manager.getTorrent()); + } + + for (int i=0;i<lookup_peers.length;i++){ - if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){ + CachePeer cp = lookup_peers[i]; - peers_to_try.add( cp ); + if ( cp.getAutoReconnect() && now - cp.getInjectTime(now) > CACHE_RECONNECT_MIN_PERIOD ){ + + if ( !connected_peers.contains( cp.getAddress().getHostAddress() + ":" + cp.getPort())){ + + peers_to_try.add( cp ); + } } } } - } - - if ( peers_to_try.size() > 0 ){ - - CachePeer peer = (CachePeer)peers_to_try.get((int)( Math.random() * peers_to_try.size())); - - // System.out.println( "Injecting cache peer " + peer.getAddress() + ":" + peer.getPort()); - - peer.setInjectTime( now ); - - pm.addPeer( peer.getAddress().getHostAddress(), peer.getPort(), 0, false ); - - last_peer_inject = now; - } - } - } - }else if ( current_average > target_speed + TARGET_SPEED_EXCESS_MARGIN){ - - long current_speed = getCurrentSpeed(); - - // decrease cache peer contribution - - long difference = current_speed - ( target_speed + TARGET_SPEED_EXCESS_MARGIN ); - - synchronized( this ){ - - if ( cache_peers != null ){ - - Iterator it = cache_peers.iterator(); - - while( it.hasNext() && difference > 0 ){ - - PEPeer peer = (PEPeer)it.next(); - - PEPeerStats peer_stats = peer.getStats(); - - long peer_rate = peer_stats.getDataReceiveRate(); - long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond(); - - if ( peer_limit == -1 ){ + if ( peers_to_try.size() > 0 ){ - // blocked, take into account adjustment in progress + CachePeer peer = (CachePeer)peers_to_try.get((int)( Math.random() * peers_to_try.size())); - difference -= peer_rate; + // System.out.println( "Injecting cache peer " + peer.getAddress() + ":" + peer.getPort()); - }else if ( peer_limit != 0 && peer_rate > peer_limit ){ + peer.setInjectTime( now ); - // adjusting + pm.addPeer( peer.getAddress().getHostAddress(), peer.getPort(), 0, false ); - difference -= peer_rate - peer_limit; + last_peer_inject = now; + } + } + } + }else if ( current_average > target_speed + TARGET_SPEED_EXCESS_MARGIN){ + + long current_speed = getCurrentSpeed(); + + // decrease cache peer contribution + + long difference = current_speed - ( target_speed + TARGET_SPEED_EXCESS_MARGIN ); + + synchronized( this ){ + + if ( cache_peers != null ){ + + Iterator it = cache_peers.iterator(); + + while( it.hasNext() && difference > 0 ){ + + PEPeer peer = (PEPeer)it.next(); + + PEPeerStats peer_stats = peer.getStats(); - }else{ + long peer_rate = peer_stats.getDataReceiveRate(); - if ( peer_rate > difference ){ - - setPeerSpeed( peer, (int)( peer_rate - difference ), now ); + long peer_limit = peer_stats.getDownloadRateLimitBytesPerSecond(); + + if ( peer_limit == -1 ){ - difference = 0; + // blocked, take into account adjustment in progress - }else{ - - setPeerSpeed( peer, -1, now ); - difference -= peer_rate; + + }else if ( peer_limit != 0 && peer_rate > peer_limit ){ + + // adjusting + + difference -= peer_rate - peer_limit; + + }else{ + + if ( peer_rate > difference ){ + + setPeerSpeed( peer, (int)( peer_rate - difference ), now ); + + difference = 0; + + }else{ + + setPeerSpeed( peer, -1, now ); + + difference -= peer_rate; + } } } } @@ -728,25 +720,31 @@ while( it.hasNext()){ PEPeer peer = (PEPeer)it.next(); - - PEPeerStats peer_stats = peer.getStats(); - - if ( peer_stats.getDownloadRateLimitBytesPerSecond() == -1 ){ - CachePeer cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY ); + CachePeer cache_peer = (CachePeer)peer.getData( PEER_CACHE_KEY ); - long time = cache_peer.getSpeedChangeTime( now ); + if ( state == DownloadManager.STATE_SEEDING ){ - if ( now - time > IDLE_PEER_DISCONNECT_PERIOD ){ - + if ( secs_since_last_up >= IDLE_SEED_DISCONNECT_PERIOD / 1000 ){ + peers_to_kick.add( peer ); - if ( disconnected_cache_peers == null ){ + addToDisconnectedCachePeers( cache_peer ); + } + }else{ + + PEPeerStats peer_stats = peer.getStats(); + + if ( peer_stats.getDownloadRateLimitBytesPerSecond() == -1 ){ + + long time = cache_peer.getSpeedChangeTime( now ); + + if ( now - time > IDLE_PEER_DISCONNECT_PERIOD ){ - disconnected_cache_peers = new ArrayList(); + peers_to_kick.add( peer ); + + addToDisconnectedCachePeers( cache_peer ); } - - disconnected_cache_peers.add( cache_peer ); } } } @@ -761,6 +759,28 @@ } protected void + addToDisconnectedCachePeers( + CachePeer cache_peer ) + { + if ( disconnected_cache_peers == null ){ + + disconnected_cache_peers = new ArrayList(); + } + + for (int i=0;i<disconnected_cache_peers.size();i++){ + + CachePeer p = (CachePeer)disconnected_cache_peers.get(i); + + if ( p.sameAs( cache_peer )){ + + return; + } + } + + disconnected_cache_peers.add( cache_peer ); + } + + protected void setPeerSpeed( PEPeer peer, int speed, |