Update of /cvsroot/jmule/jmule2/src/org/jmule/core/networkmanager
In directory sfp-cvsdas-3.v30.ch3.sourceforge.com:/tmp/cvs-serv2393/src/org/jmule/core/networkmanager
Modified Files:
NetworkManagerImpl.java
Log Message:
Fixed bugs with key attachments.
Index: NetworkManagerImpl.java
===================================================================
RCS file: /cvsroot/jmule/jmule2/src/org/jmule/core/networkmanager/NetworkManagerImpl.java,v
retrieving revision 1.39
retrieving revision 1.40
diff -C2 -d -r1.39 -r1.40
*** NetworkManagerImpl.java 28 Jun 2010 18:29:39 -0000 1.39
--- NetworkManagerImpl.java 30 Jun 2010 18:12:47 -0000 1.40
***************
*** 102,109 ****
import org.jmule.core.edonkey.ClientID;
import org.jmule.core.edonkey.E2DKConstants;
import org.jmule.core.edonkey.FileHash;
import org.jmule.core.edonkey.PartHashSet;
import org.jmule.core.edonkey.UserHash;
- import org.jmule.core.edonkey.E2DKConstants.ServerFeatures;
import org.jmule.core.edonkey.packet.Packet;
import org.jmule.core.edonkey.packet.PacketFactory;
--- 102,109 ----
import org.jmule.core.edonkey.ClientID;
import org.jmule.core.edonkey.E2DKConstants;
+ import org.jmule.core.edonkey.E2DKConstants.ServerFeatures;
import org.jmule.core.edonkey.FileHash;
import org.jmule.core.edonkey.PartHashSet;
import org.jmule.core.edonkey.UserHash;
import org.jmule.core.edonkey.packet.Packet;
import org.jmule.core.edonkey.packet.PacketFactory;
***************
*** 124,130 ****
import org.jmule.core.peermanager.InternalPeerManager;
import org.jmule.core.peermanager.Peer;
import org.jmule.core.peermanager.PeerManagerException;
import org.jmule.core.peermanager.PeerManagerSingleton;
- import org.jmule.core.peermanager.Peer.PeerSource;
import org.jmule.core.searchmanager.InternalSearchManager;
import org.jmule.core.searchmanager.SearchManagerSingleton;
--- 124,130 ----
import org.jmule.core.peermanager.InternalPeerManager;
import org.jmule.core.peermanager.Peer;
+ import org.jmule.core.peermanager.Peer.PeerSource;
import org.jmule.core.peermanager.PeerManagerException;
import org.jmule.core.peermanager.PeerManagerSingleton;
import org.jmule.core.searchmanager.InternalSearchManager;
import org.jmule.core.searchmanager.SearchManagerSingleton;
***************
*** 315,322 ****
try {
connection.disconnect();
! } catch (NetworkManagerException e) {
e.printStackTrace();
}
!
if (serverConnectionMonitor != null)
serverConnectionMonitor.JMStop();
--- 315,322 ----
try {
connection.disconnect();
! } catch (IOException e) {
e.printStackTrace();
}
!
if (serverConnectionMonitor != null)
serverConnectionMonitor.JMStop();
***************
*** 417,424 ****
try {
connection.disconnect();
! peer_connections.remove(ip+KEY_SEPARATOR+port);
!
connection = null;
! } catch (NetworkManagerException e) {
e.printStackTrace();
}
--- 417,423 ----
try {
connection.disconnect();
! peer_connections.remove(ip + KEY_SEPARATOR + port);
connection = null;
! } catch (IOException e) {
e.printStackTrace();
}
***************
*** 575,578 ****
--- 574,581 ----
public void peerConnectingFailed(String ip, int port, Throwable cause) {
_peer_manager.peerConnectingFailed(ip, port, cause);
+ System.out.println("peerConnectingFailed : " + ip + KEY_SEPARATOR + port);
+ JMPeerConnection connection = peer_connections.get(ip + KEY_SEPARATOR + port);
+ if (connection != null)
+ connection.setStatus(ConnectionStatus.DISCONNECTED);
peer_connections.remove(ip + KEY_SEPARATOR + port);
}
***************
*** 580,583 ****
--- 583,599 ----
public void peerDisconnected(String ip, int port) {
_peer_manager.peerDisconnected(ip, port);
+ System.out.println("peerDisconnected : " + ip + KEY_SEPARATOR + port);
+ JMPeerConnection connection = peer_connections.get(ip + KEY_SEPARATOR + port);
+ if (connection != null)
+ connection.setStatus(ConnectionStatus.DISCONNECTED);
+ peer_connections.remove(ip + KEY_SEPARATOR + port);
+ }
+
+ public void peerIOError(String ip, int port, Throwable cause) {
+ _peer_manager.peerDisconnected(ip, port);
+ System.out.println("peerIOError : " + ip + KEY_SEPARATOR + port);
+ JMPeerConnection connection = peer_connections.get(ip + KEY_SEPARATOR + port);
+ if (connection != null)
+ connection.setStatus(ConnectionStatus.DISCONNECTED);
peer_connections.remove(ip + KEY_SEPARATOR + port);
}
***************
*** 2299,2302 ****
--- 2315,2319 ----
} catch (IOException e) {
e.printStackTrace();
+ peerIOError(connection.getIPAddress(), connection.getPort(), e);
}
} else if (connection.getStatus() == ConnectionStatus.DISCONNECTED) {
***************
*** 2312,2315 ****
--- 2329,2333 ----
} catch (IOException e) {
e.printStackTrace();
+ peerConnectingFailed(connection.getIPAddress(), connection.getPort(), e);
}
}
***************
*** 2326,2331 ****
} catch (ClosedChannelException e) {
e.printStackTrace();
! connection.getJMConnection().disconnect();
! connection.setStatus(ConnectionStatus.DISCONNECTED);
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
--- 2344,2352 ----
} catch (ClosedChannelException e) {
e.printStackTrace();
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
***************
*** 2336,2343 ****
private static final int DISABLE_WRITE = ~SelectionKey.OP_WRITE;
! public void run() {
! Collection<SelectionKey> must_install_read = new ArrayList<SelectionKey>();
! Collection<SelectionKey> must_install_write = new ArrayList<SelectionKey>();
while (loop) {
--- 2357,2396 ----
private static final int DISABLE_WRITE = ~SelectionKey.OP_WRITE;
! private Collection<SelectionKey> must_install_read = new ArrayList<SelectionKey>();
! private Collection<SelectionKey> must_install_write = new ArrayList<SelectionKey>();
!
! void processPendingKeys() {
! if (downloadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_read) {
! if (!key.isValid()) continue;
! try {
! key.interestOps(SelectionKey.OP_READ | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
! JMPeerConnection connection = (JMPeerConnection) key.attachment();
! peerIOError(connection.getIPAddress(), connection.getUsePort(),t);
! t.printStackTrace();
! }
! }
! must_install_read.clear();
! }
! if (uploadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_write) {
! if (!key.isValid()) continue;
! try {
! key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
! JMPeerConnection connection = (JMPeerConnection) key.attachment();
! peerIOError(connection.getIPAddress(), connection.getUsePort(),t);
! t.printStackTrace();
! }
! }
! must_install_write.clear();
! }
! }
!
! public void run() {
while (loop) {
***************
*** 2351,2374 ****
}
! if (downloadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_read)
! try {
! key.interestOps(SelectionKey.OP_READ | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
!
! t.printStackTrace(); }
! must_install_read.clear();
! }
!
! if (uploadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_write)
! try {
! key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
! t.printStackTrace(); }
! must_install_write.clear();
! }
int selectedConnections = 0;
--- 2404,2408 ----
}
! processPendingKeys();
int selectedConnections = 0;
***************
*** 2377,2381 ****
if (selectedConnections == 0) {
isWaiting = true;
! selectedConnections = peerSelector.select(50);
isWaiting = false;
}
--- 2411,2415 ----
if (selectedConnections == 0) {
isWaiting = true;
! selectedConnections = peerSelector.select(1000);
isWaiting = false;
}
***************
*** 2392,2414 ****
}
! if (downloadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_read)
! try {
! key.interestOps(SelectionKey.OP_READ | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
! t.printStackTrace(); }
! must_install_read.clear();
! }
!
! if (uploadController.getAvailableByteCount(false)!=0) {
! for(SelectionKey key : must_install_write)
! try {
! key.interestOps(SelectionKey.OP_WRITE | key.interestOps());
! } catch (Throwable t) {
! System.out.println("Error at key : " + key.attachment());
! t.printStackTrace(); }
! must_install_write.clear();
! }
if (selectedConnections == 0)
--- 2426,2430 ----
}
! processPendingKeys();
if (selectedConnections == 0)
***************
*** 2422,2427 ****
keys.remove();
JMPeerConnection connection = (JMPeerConnection) key.attachment();
SocketChannel peerChannel = connection.getJMConnection().getChannel();
-
if (key.isConnectable()) {
if (peerChannel.isConnectionPending()) {
--- 2438,2445 ----
keys.remove();
JMPeerConnection connection = (JMPeerConnection) key.attachment();
+ if (!key.isValid()) {
+ continue;
+ }
SocketChannel peerChannel = connection.getJMConnection().getChannel();
if (key.isConnectable()) {
if (peerChannel.isConnectionPending()) {
***************
*** 2433,2437 ****
} catch (IOException e) {
e.printStackTrace();
- connection.setStatus(ConnectionStatus.DISCONNECTED);
peerConnectingFailed(connection.getIPAddress(),connection.getUsePort(), e);
}
--- 2451,2454 ----
***************
*** 2448,2454 ****
--- 2465,2474 ----
try {
key.interestOps(DISABLE_READ & key.interestOps());
+ key.attach(connection);
must_install_read.add(key);
} catch (Throwable t) {
t.printStackTrace();
+ connection.setStatus(ConnectionStatus.DISCONNECTED);
+ peerIOError(connection.getIPAddress(), connection.getUsePort(),t);
}
} else
***************
*** 2463,2469 ****
--- 2483,2492 ----
try {
key.interestOps(DISABLE_WRITE & key.interestOps());
+ key.attach(connection);
must_install_write.add(key);
} catch (Throwable t) {
t.printStackTrace();
+ connection.setStatus(ConnectionStatus.DISCONNECTED);
+ peerIOError(connection.getIPAddress(), connection.getUsePort(),t);
}
} else
***************
*** 2489,2495 ****
}
! private class PeerPacketReader extends JMThread {
private Queue<SelectionKey> keys_to_read = new ConcurrentLinkedQueue<SelectionKey>();
private boolean loop = true;
private boolean isSleeping = false;
--- 2512,2530 ----
}
! /* class PeerKeyContainer {
!
! public PeerKeyContainer(SelectionKey key, JMPeerConnection connection) {
! super();
! this.key = key;
! this.connection = connection;
! }
!
! SelectionKey key;
! JMPeerConnection connection;
! }*/
+ private class PeerPacketReader extends JMThread {
private Queue<SelectionKey> keys_to_read = new ConcurrentLinkedQueue<SelectionKey>();
+ //private Set<SelectionKey> stored_keys = new HashSet<SelectionKey>();
private boolean loop = true;
private boolean isSleeping = false;
***************
*** 2521,2524 ****
--- 2556,2560 ----
SelectionKey key = keys_to_read.poll();
+ //stored_keys.remove(container.key);
JMPeerConnection connection = (JMPeerConnection) key.attachment();
try {
***************
*** 2529,2532 ****
--- 2565,2569 ----
}catch(Throwable cause) {
cause.printStackTrace();
+ peerIOError(connection.getIPAddress(), connection.getUsePort(),cause);
}
}
***************
*** 2548,2558 ****
e.printStackTrace();
- connection.setStatus(ConnectionStatus.DISCONNECTED);
try {
! connection.getJMConnection().getChannel().register(peerSelector, 0);
} catch (ClosedChannelException e1) {
e1.printStackTrace();
}
! connection.getJMConnection().disconnect();
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
--- 2585,2598 ----
e.printStackTrace();
try {
! connection.getJMConnection().getChannel().register(peerSelector, 0, connection);
} catch (ClosedChannelException e1) {
e1.printStackTrace();
}
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
***************
*** 2561,2565 ****
} catch (IOException e) {
System.out.println("Exception in : " + connection);
! connection.setIoErrors(connection.getIoErrors()+1);
if (connection.getIoErrors() > 3)
disconnectPeer(connection.getIPAddress(), connection.getUsePort());
--- 2601,2605 ----
} catch (IOException e) {
System.out.println("Exception in : " + connection);
! connection.incrementIOErrors();
if (connection.getIoErrors() > 3)
disconnectPeer(connection.getIPAddress(), connection.getUsePort());
***************
*** 2569,2574 ****
public void scheduleToRead(SelectionKey key) {
! if (!keys_to_read.contains(key))
keys_to_read.offer(key);
if (isSleeping())
wakeUp();
--- 2609,2615 ----
public void scheduleToRead(SelectionKey key) {
! if (!keys_to_read.contains(key)) {
keys_to_read.offer(key);
+ }
if (isSleeping())
wakeUp();
***************
*** 2593,2596 ****
--- 2634,2638 ----
private class PeerPacketWriter extends JMThread {
private Queue<SelectionKey> keys_to_write = new ConcurrentLinkedQueue<SelectionKey>();
+ //private Set<SelectionKey> stored_keys = new HashSet<SelectionKey>();
private boolean loop = true;
private boolean isSleeping = false;
***************
*** 2622,2625 ****
--- 2664,2668 ----
SelectionKey key = keys_to_write.poll();
+ //stored_keys.remove(container.key);
JMPeerConnection connection = (JMPeerConnection) key.attachment();
try {
***************
*** 2630,2633 ****
--- 2673,2677 ----
} catch (Throwable cause) {
cause.printStackTrace();
+ peerIOError(connection.getIPAddress(), connection.getUsePort(),cause);
}
}
***************
*** 2635,2640 ****
public void scheduleToWrite(SelectionKey key) {
! if (!keys_to_write.contains(key))
keys_to_write.offer(key);
if (isSleeping())
wakeUp();
--- 2679,2685 ----
public void scheduleToWrite(SelectionKey key) {
! if (!keys_to_write.contains(key)) {
keys_to_write.offer(key);
+ }
if (isSleeping())
wakeUp();
***************
*** 2648,2655 ****
} catch (ClosedChannelException e) {
e.printStackTrace();
! connection.getJMConnection().disconnect();
! connection.setStatus(ConnectionStatus.DISCONNECTED);
! peerDisconnected(connection.getIPAddress(), connection
! .getUsePort());
}
return;
--- 2693,2702 ----
} catch (ClosedChannelException e) {
e.printStackTrace();
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
! peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
return;
***************
*** 2660,2665 ****
} catch (ClosedChannelException e) {
e.printStackTrace();
! connection.getJMConnection().disconnect();
! connection.setStatus(ConnectionStatus.DISCONNECTED);
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
--- 2707,2715 ----
} catch (ClosedChannelException e) {
e.printStackTrace();
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
***************
*** 2678,2685 ****
} catch (ClosedChannelException e) {
e.printStackTrace();
! connection.getJMConnection().disconnect();
! connection.setStatus(ConnectionStatus.DISCONNECTED);
! peerDisconnected(connection.getIPAddress(), connection
! .getUsePort());
}
return;
--- 2728,2737 ----
} catch (ClosedChannelException e) {
e.printStackTrace();
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
! peerDisconnected(connection.getIPAddress(), connection.getUsePort());
}
return;
***************
*** 2702,2716 ****
connection.setStatus(ConnectionStatus.DISCONNECTED);
try {
! connection.getJMConnection().getChannel().register(peerSelector, 0);
} catch (ClosedChannelException e1) {
e1.printStackTrace();
}
! connection.getJMConnection().disconnect();
! peerDisconnected(connection.getIPAddress(), connection
! .getUsePort());
send_queue.remove(connection);
} catch (IOException e) {
e.printStackTrace();
}
if (!packet.getAsByteBuffer().hasRemaining()) {
--- 2754,2774 ----
connection.setStatus(ConnectionStatus.DISCONNECTED);
try {
! connection.getJMConnection().getChannel().register(peerSelector, 0,connection);
} catch (ClosedChannelException e1) {
e1.printStackTrace();
}
! try {
! connection.disconnect();
! } catch (IOException e1) {
! e1.printStackTrace();
! }
! peerDisconnected(connection.getIPAddress(), connection.getUsePort());
send_queue.remove(connection);
} catch (IOException e) {
e.printStackTrace();
+ connection.incrementIOErrors();
+ if (connection.getIoErrors() > 3)
+ disconnectPeer(connection.getIPAddress(), connection.getUsePort());
}
if (!packet.getAsByteBuffer().hasRemaining()) {
***************
*** 2720,2724 ****
}
-
public void JMStop() {
loop = false;
--- 2778,2781 ----
***************
*** 2734,2738 ****
}
}
-
}
--- 2791,2794 ----
***************
*** 3013,3018 ****
if (sendSequence) {
try {
! serverConnection.getJMChannel().getChannel().register(
! serverSelector, SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
--- 3069,3073 ----
if (sendSequence) {
try {
! serverConnection.getJMChannel().getChannel().register(serverSelector, SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
***************
*** 3022,3027 ****
if (selectCount == 0)
continue;
! Iterator<SelectionKey> keys = serverSelector.selectedKeys()
! .iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
--- 3077,3081 ----
if (selectCount == 0)
continue;
! Iterator<SelectionKey> keys = serverSelector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
|