From: <th...@us...> - 2010-04-12 21:02:57
|
Revision: 1809 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1809&view=rev Author: the8472 Date: 2010-04-12 21:02:51 +0000 (Mon, 12 Apr 2010) Log Message: ----------- improve thread safety Modified Paths: -------------- mldht/trunk/DHTIndexer.jar mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java mldht/trunk/lbms/plugins/mldht/indexer/TorrentFinder.java Modified: mldht/trunk/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java 2010-04-12 00:33:53 UTC (rev 1808) +++ mldht/trunk/lbms/plugins/mldht/indexer/PullMetaDataConnection.java 2010-04-12 21:02:51 UTC (rev 1809) @@ -88,8 +88,29 @@ boolean isState(int mask) {return (state & mask) != 0;} - public PullMetaDataConnection(InetSocketAddress dest, byte[] infoHash) { + public PullMetaDataConnection(byte[] infoHash) { this.infoHash = infoHash; + ByteBuffer outputBuffer = ByteBuffer.allocate(20+8+20+20); + byte[] peerID = new byte[20]; + DHT.rand.nextBytes(peerID); + + outputBuffer.put(preamble); + outputBuffer.put(bitfield); + outputBuffer.put(infoHash); + outputBuffer.put(peerID); + + + outputBuffer.flip(); + outputBuffers.addLast(outputBuffer); + // wait for handshake from the other side + inputBuffer.limit(outputBuffer.limit()); + + setState(STATE_CONNECTING, true); + setState(STATE_BASIC_HANDSHAKING, true); + + } + + public void connect(InetSocketAddress dest) { try { channel = SocketChannel.open(); @@ -97,32 +118,14 @@ channel.socket().setReuseAddress(true); //channel.bind(new InetSocketAddress(49002)); + + lastReceivedTime = System.currentTimeMillis(); + + // register after setting the last received time, otherwise the connection might get killed on the spot conHandler.register(this); - - ByteBuffer outputBuffer = ByteBuffer.allocate(20+8+20+20); - byte[] peerID = new byte[20]; - DHT.rand.nextBytes(peerID); - outputBuffer.put(preamble); - outputBuffer.put(bitfield); - outputBuffer.put(infoHash); - outputBuffer.put(peerID); - - - outputBuffer.flip(); - outputBuffers.addLast(outputBuffer); - // wait for handshake from the other side - inputBuffer.limit(outputBuffer.limit()); - - setState(STATE_CONNECTING, true); - setState(STATE_BASIC_HANDSHAKING, true); - - conHandler.setSelection(this, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE, true); - - - lastReceivedTime = System.currentTimeMillis(); - + //System.out.println("attempting connect "+dest); if(channel.connect(dest)) connectEvent(); Modified: mldht/trunk/lbms/plugins/mldht/indexer/TorrentFinder.java =================================================================== --- mldht/trunk/lbms/plugins/mldht/indexer/TorrentFinder.java 2010-04-12 00:33:53 UTC (rev 1808) +++ mldht/trunk/lbms/plugins/mldht/indexer/TorrentFinder.java 2010-04-12 21:02:51 UTC (rev 1809) @@ -75,8 +75,9 @@ activeConnections.incrementAndGet(); PeerAddressDBItem item = task.addresses.get(0); task.addresses.remove(item); - InetSocketAddress addr = new InetSocketAddress(item.getInetAddress(),item.getPort()); - final PullMetaDataConnection conn = new PullMetaDataConnection(addr, task.hash ); + + final PullMetaDataConnection conn = new PullMetaDataConnection(task.hash ); + conn.terminationHandler = new Runnable() { @Override public void run() { @@ -126,6 +127,9 @@ fetchMetadata(); } }; + // connect after registering the handler in case we get an immediate terminate event + InetSocketAddress addr = new InetSocketAddress(item.getInetAddress(),item.getPort()); + conn.connect(addr); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |