User: belaban
Date: 08/05/28 06:23:37
Modified: src/org/jgroups/protocols UDP.java
Log:
reverted to 170
Revision Changes Path
1.172 +210 -119 JGroups/src/org/jgroups/protocols/UDP.java
Index: UDP.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/protocols/UDP.java,v
retrieving revision 1.171
retrieving revision 1.172
diff -u -r1.171 -r1.172
--- UDP.java 28 May 2008 12:55:58 -0000 1.171
+++ UDP.java 28 May 2008 13:23:36 -0000 1.172
@@ -8,11 +8,12 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.util.BoundedList;
import org.jgroups.util.Util;
+import org.jgroups.util.DefaultThreadFactory;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.net.*;
import java.util.*;
-import java.util.concurrent.Callable;
@@ -38,9 +39,9 @@
* input buffer overflow, consider setting this property to true.
* </ul>
* @author Bela Ban
- * @version $Id: UDP.java,v 1.171 2008/05/28 12:55:58 vlada Exp $
+ * @version $Id: UDP.java,v 1.172 2008/05/28 13:23:36 belaban Exp $
*/
-public class UDP extends TP {
+public class UDP extends TP implements Runnable {
/** Socket used for
* <ol>
@@ -84,10 +85,14 @@
/** The multicast port used for sending and receiving packets */
int mcast_port=7600;
-
- PacketReceiver ucast_receiver;
-
- PacketReceiver mcast_receiver;
+
+ /** The multicast receiver thread */
+ Thread mcast_receiver=null;
+
+ private final static String MCAST_RECEIVER_THREAD_NAME = "UDP mcast";
+
+ /** The unicast receiver thread */
+ UcastReceiver ucast_receiver=null;
/** Whether to enable IP multicasting. If false, multiple unicast datagram
* packets are sent rather than one multicast packet */
@@ -153,7 +158,7 @@
super.setProperties(props);
listDeprecatedProperties(props, "num_last_ports","null_src_addresses");
-
+
String str=Util.getProperty(new String[]{Global.UDP_MCAST_ADDR, "jboss.partition.udpGroup"}, props,
"mcast_addr", false, "228.8.8.8");
if(str != null)
@@ -163,12 +168,12 @@
props, "mcast_port", false, "7600");
if(str != null)
mcast_port=Integer.parseInt(str);
-
+
str=Util.getProperty(new String[]{Global.UDP_IP_TTL}, props, "ip_ttl", false, "64");
if(str != null) {
ip_ttl=Integer.parseInt(str);
props.remove("ip_ttl");
- }
+ }
Util.checkBufferSize("UDP.mcast_send_buf_size", mcast_send_buf_size);
Util.checkBufferSize("UDP.mcast_recv_buf_size", mcast_recv_buf_size);
@@ -177,6 +182,53 @@
return props.isEmpty();
}
+
+
+
+
+
+ /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
+
+ public void run() {
+ final byte receive_buf[]=new byte[65535];
+ int offset, len, sender_port;
+ InetAddress sender_addr;
+ Address sender;
+
+ final DatagramPacket packet=new DatagramPacket(receive_buf, receive_buf.length);
+
+ while(mcast_receiver != null && mcast_sock != null) {
+ try {
+ mcast_sock.receive(packet);
+ len=packet.getLength();
+ if(len > receive_buf.length) {
+ if(log.isErrorEnabled())
+ log.error("size of the received packet (" + len + ") is bigger than " +
+ "allocated buffer (" + receive_buf.length + "): will not be able to handle packet. " +
+ "Use the FRAG2 protocol and make its frag_size lower than " + receive_buf.length);
+ }
+
+ sender_addr=packet.getAddress();
+ sender_port=packet.getPort();
+ offset=packet.getOffset();
+ sender=new IpAddress(sender_addr, sender_port);
+
+ receive(mcast_addr, sender, receive_buf, offset, len);
+ }
+ catch(SocketException sock_ex) {
+ if(log.isTraceEnabled()) log.trace("multicast socket is closed, exception=" + sock_ex);
+ break;
+ }
+ catch(InterruptedIOException io_ex) { // thread was interrupted
+ }
+ catch(Throwable ex) {
+ if(log.isErrorEnabled())
+ log.error("failure in multicast receive()", ex);
+ }
+ }
+ if(log.isDebugEnabled()) log.debug("multicast thread terminated");
+ }
+
public String getInfo() {
StringBuilder sb=new StringBuilder();
sb.append("group_addr=").append(mcast_addr_name).append(':').append(mcast_port).append("\n");
@@ -215,29 +267,30 @@
DatagramPacket packet=new DatagramPacket(data, offset, length, dest, port);
try {
if(mcast) {
- if(mcast_send_sockets != null) {
- for(MulticastSocket s:mcast_send_sockets) {
+ if(mcast_send_sockets != null) {
+ MulticastSocket s;
+ for(int i=0; i < mcast_send_sockets.length; i++) {
+ s=mcast_send_sockets[i];
try {
s.send(packet);
}
catch(Exception e) {
log.error("failed sending packet on socket " + s);
}
- }
+ }
}
- else { // DEFAULT path
- mcast_sock.send(packet);
+ else { // DEFAULT path
+ if(mcast_sock != null)
+ mcast_sock.send(packet);
}
}
- else {
- sock.send(packet);
+ else {
+ if(sock != null)
+ sock.send(packet);
}
}
catch(Exception ex) {
- boolean mcastConnected=mcast_sock != null && mcast_sock.isConnected();
- boolean sockConnected=sock != null && sock.isConnected();
- if(mcastConnected || sockConnected)
- throw new Exception("dest=" + dest + ":" + port + " (" + length + " bytes)", ex);
+ throw new Exception("dest=" + dest + ":" + port + " (" + length + " bytes)", ex);
}
}
@@ -268,32 +321,14 @@
throw new Exception(tmp, ex);
}
super.start();
-
- ucast_receiver=new PacketReceiver(sock,
- local_addr,
- "UDP ucast receiver",
- new Callable<Void>() {
- public Void call() throws Exception {
- closeSocket();
- return null;
- }
- });
- mcast_receiver=new PacketReceiver(mcast_sock,
- mcast_addr,
- "UDP mcast receiver",
- new Callable<Void>() {
- public Void call() throws Exception {
-
- closeMulticastSocket();
- return null;
- }
- });
+ // startThreads(); // moved to handleConnect()
}
public void stop() {
if(log.isDebugEnabled()) log.debug("closing sockets and stopping threads");
- stopThreads(); // will close sockets as well
+ stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
+ closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
super.stop();
}
@@ -497,7 +532,7 @@
// special handling for Linux 2.6 kernel which sometimes throws BindException while we probe for a random port
localPort++;
continue;
- }
+ }
localPort=tmp.getLocalPort();
if(last_ports_used.contains(localPort)) {
if(log.isDebugEnabled())
@@ -573,8 +608,10 @@
if(mcast_send_sockets != null) {
- sb.append("\n").append(mcast_send_sockets.length).append(" mcast send sockets:\n");
- for(MulticastSocket s:mcast_send_sockets) {
+ sb.append("\n").append(mcast_send_sockets.length).append(" mcast send sockets:\n");
+ MulticastSocket s;
+ for(int i=0; i < mcast_send_sockets.length; i++) {
+ s=mcast_send_sockets[i];
sb.append(s.getInterface().getHostAddress()).append(':').append(s.getLocalPort());
sb.append(", send buffer size=").append(s.getSendBufferSize());
sb.append(", receive buffer size=").append(s.getReceiveBufferSize()).append("\n");
@@ -592,8 +629,8 @@
setBufferSize(mcast_sock, mcast_send_buf_size, mcast_recv_buf_size);
if(mcast_send_sockets != null) {
- for(MulticastSocket s:mcast_send_sockets) {
- setBufferSize(s, mcast_send_buf_size, mcast_recv_buf_size);
+ for(int i=0; i < mcast_send_sockets.length; i++) {
+ setBufferSize(mcast_send_sockets[i], mcast_send_buf_size, mcast_recv_buf_size);
}
}
}
@@ -613,7 +650,20 @@
if(log.isWarnEnabled()) log.warn("failed setting receive buffer size of " + recv_buf_size + " in " + sock + ": " + ex);
}
}
-
+
+
+ /**
+ * Closed UDP unicast and multicast sockets
+ */
+ void closeSockets() {
+ // 1. Close multicast socket
+ closeMulticastSocket();
+
+ // 2. Close socket
+ closeSocket();
+ }
+
+
void closeMulticastSocket() {
if(mcast_sock != null) {
try {
@@ -630,10 +680,11 @@
}
if(mcast_send_sockets != null) {
- for(MulticastSocket s:mcast_send_sockets) {
+ MulticastSocket s;
+ for(int i=0; i < mcast_send_sockets.length; i++) {
+ s=mcast_send_sockets[i];
s.close();
- if(log.isDebugEnabled())
- log.debug("multicast send socket " + s + " closed");
+ if(log.isDebugEnabled()) log.debug("multicast send socket " + s + " closed");
}
mcast_send_sockets=null;
}
@@ -659,12 +710,34 @@
* Starts the unicast and multicast receiver threads
*/
void startThreads() throws Exception {
- if(!ucast_receiver.isRunning()) {
- ucast_receiver.start();
+ if(ucast_receiver == null) {
+ //start the listener thread of the ucast_recv_sock
+ ucast_receiver=new UcastReceiver();
+ ucast_receiver.start();
+
+ global_thread_factory.renameThread(UcastReceiver.UCAST_RECEIVER_THREAD_NAME, ucast_receiver.getThread());
+
+ if(log.isDebugEnabled())
+ log.debug("created unicast receiver thread " + ucast_receiver.getThread());
}
- if(ip_mcast && !mcast_receiver.isRunning()) {
- mcast_receiver.start();
+ if(ip_mcast) {
+ if(mcast_receiver != null) {
+ if(mcast_receiver.isAlive()) {
+ if(log.isDebugEnabled()) log.debug("did not create new multicastreceiver thread as existing " +
+ "multicast receiver thread is still running");
+ }
+ else
+ mcast_receiver=null; // will be created just below...
+ }
+
+ if(mcast_receiver == null) {
+ mcast_receiver=global_thread_factory.newThread(this,MCAST_RECEIVER_THREAD_NAME);
+ mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ????
+ mcast_receiver.start();
+ if(log.isDebugEnabled())
+ log.debug("created multicast receiver thread " + mcast_receiver);
+ }
}
}
@@ -672,18 +745,49 @@
/**
* Stops unicast and multicast receiver threads
*/
- void stopThreads() {
- mcast_receiver.stop();
- ucast_receiver.stop();
+ void stopThreads() {
+ Thread tmp;
+
+ // 1. Stop the multicast receiver thread
+ if(mcast_receiver != null) {
+ if(mcast_receiver.isAlive()) {
+ tmp=mcast_receiver;
+ mcast_receiver=null;
+ closeMulticastSocket(); // will cause the multicast thread to terminate
+ tmp.interrupt();
+ try {
+ tmp.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
+ }
+ catch(InterruptedException e) {
+ Thread.currentThread().interrupt(); // set interrupt flag again
+ }
+ tmp=null;
+ }
+ mcast_receiver=null;
+ }
+
+ // 2. Stop the unicast receiver thread
+ if(ucast_receiver != null) {
+ ucast_receiver.stop();
+ ucast_receiver=null;
+ }
}
protected void setThreadNames() {
- super.setThreadNames();
+ super.setThreadNames();
+ global_thread_factory.renameThread(MCAST_RECEIVER_THREAD_NAME, mcast_receiver);
+ if(ucast_receiver != null)
+ global_thread_factory.renameThread(UcastReceiver.UCAST_RECEIVER_THREAD_NAME, ucast_receiver.getThread());
}
protected void unsetThreadNames() {
- super.unsetThreadNames();
+ super.unsetThreadNames();
+ if(mcast_receiver != null)
+ mcast_receiver.setName(MCAST_RECEIVER_THREAD_NAME);
+
+ if(ucast_receiver != null && ucast_receiver.getThread() != null)
+ ucast_receiver.getThread().setName(UcastReceiver.UCAST_RECEIVER_THREAD_NAME);
}
@@ -712,53 +816,37 @@
/* ----------------------------- Inner Classes ---------------------------------------- */
-
- public class PacketReceiver implements Runnable {
- private volatile boolean running=false;
- private Thread thread=null;
- private final DatagramSocket socket;
- private final Address dest;
- private final String name;
- private final Callable<Void> closeStrategy;
-
-
- public PacketReceiver(DatagramSocket socket,Address dest,String name,Callable<Void> closeStrategy) {
- this.socket=socket;
- this.dest=dest;
- this.name=name;
- this.closeStrategy = closeStrategy;
- }
- public Thread getThread() {
- return thread;
- }
- public boolean isRunning() {
- return running;
+
+ public class UcastReceiver implements Runnable {
+
+ public static final String UCAST_RECEIVER_THREAD_NAME = "UDP ucast";
+ boolean running=true;
+ Thread thread=null;
+
+ public Thread getThread(){
+ return thread;
}
-
+
+
public void start() {
if(thread == null) {
- thread=getThreadFactory().newThread(this, name);
+ thread=global_thread_factory.newThread(this,UCAST_RECEIVER_THREAD_NAME);
+ // thread.setDaemon(true);
+ running=true;
thread.start();
- running=true;
- if(log.isDebugEnabled())
- log.debug("created " + thread.getName());
}
}
+
public void stop() {
Thread tmp;
if(thread != null && thread.isAlive()) {
running=false;
tmp=thread;
thread=null;
- try {
- closeStrategy.call();
- }
- catch(Exception e1) {
- //ignore
- }
+ closeSocket(); // this will cause the thread to break out of its loop
tmp.interrupt();
try {
tmp.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
@@ -770,44 +858,47 @@
}
thread=null;
}
-
+
+
public void run() {
- final byte receive_buf[]=new byte[65535];
+ final byte receive_buf[]=new byte[65535];
+ int offset, len, sender_port;
+ InetAddress sender_addr;
+ Address sender;
+
final DatagramPacket packet=new DatagramPacket(receive_buf, receive_buf.length);
- while(running) {
+ while(running && thread != null && sock != null) {
try {
- socket.receive(packet);
- int len=packet.getLength();
+ sock.receive(packet);
+ len=packet.getLength();
if(len > receive_buf.length) {
if(log.isErrorEnabled())
- log.error("size of the received packet (" + len
- + ") is bigger than "
- + "allocated buffer ("
- + receive_buf.length
- + "): will not be able to handle packet. "
- + "Use the FRAG2 protocol and make its frag_size lower than "
- + receive_buf.length);
+ log.error("size of the received packet (" + len + ") is bigger than allocated buffer (" +
+ receive_buf.length + "): will not be able to handle packet. " +
+ "Use the FRAG2 protocol and make its frag_size lower than " + receive_buf.length);
}
- receive(dest,
- new IpAddress(packet.getAddress(), packet.getPort()),
- receive_buf,
- packet.getOffset(),
- len);
- }
- catch(IOException sock_ex) {
- boolean socketConnected = (socket != null && socket.isConnected());
- if(!socketConnected){
- if(log.isTraceEnabled())
- log.trace(socket + " is closed, exception=" + sock_ex);
- break;
- }
+
+ sender_addr=packet.getAddress();
+ sender_port=packet.getPort();
+ offset=packet.getOffset();
+ sender=new IpAddress(sender_addr, sender_port);
+
+ receive(local_addr, sender, receive_buf, offset, len);
+ }
+ catch(SocketException sock_ex) {
+ if(log.isDebugEnabled()) log.debug("unicast receiver socket is closed, exception=" + sock_ex);
+ break;
+ }
+ catch(InterruptedIOException io_ex) { // thread was interrupted
}
catch(Throwable ex) {
if(log.isErrorEnabled())
- log.error("failure in receive()", ex);
+ log.error("[" + local_addr + "] failed receiving unicast packet", ex);
}
}
+ if(log.isDebugEnabled()) log.debug("unicast receiver thread terminated");
}
- }
+ }
+
}
|