From: Jeff H. <jh...@us...> - 2004-01-05 04:15:06
|
User: jhaynie Date: 04/01/04 20:15:05 Modified: src/main/org/jboss/remoting/detection/multicast Tag: Branch_3_2 MulticastDetector.java Log: - added reference to jmx in classpath of remoting - update code after running inside debugger / profiler to fix memory leaks and dangling threads - slightly improve multicast detector by caching detection notification and only re-serialization on change - fixed shutdown problems in detector and connector/invoker code Revision Changes Path No revision No revision 1.7.4.1 +154 -109 jboss-remoting/src/main/org/jboss/remoting/detection/multicast/MulticastDetector.java Index: MulticastDetector.java =================================================================== RCS file: /cvsroot/jboss/jboss-remoting/src/main/org/jboss/remoting/detection/multicast/MulticastDetector.java,v retrieving revision 1.7 retrieving revision 1.7.4.1 diff -u -r1.7 -r1.7.4.1 --- MulticastDetector.java 10 Aug 2003 06:28:39 -0000 1.7 +++ MulticastDetector.java 5 Jan 2004 04:15:05 -0000 1.7.4.1 @@ -8,12 +8,6 @@ ***************************************/ package org.jboss.remoting.detection.multicast; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.MulticastSocket; -import java.util.ArrayList; -import java.util.List; - import org.jboss.remoting.InvokerLocator; import org.jboss.remoting.InvokerRegistry; import org.jboss.remoting.ServerInvoker; @@ -22,12 +16,18 @@ import org.jboss.remoting.ident.Identity; import org.jboss.remoting.loading.ClassUtil; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; +import java.util.ArrayList; +import java.util.List; + /** * MulticastDetector - * + * * @author <a href="mailto:jh...@vo...">Jeff Haynie</a> * @author <a href="mailto:adr...@ha...">Adrian Brock</a> - * @version $Revision: 1.7 $ + * @version $Revision: 1.7.4.1 $ */ public class MulticastDetector extends AbstractDetector implements MulticastDetectorMBean { @@ -41,127 +41,143 @@ private MulticastSocket socket; private Identity identity; private Listener listener = new Listener(); + private Detection lastDetection; + private byte lastDetectionBuf[]; /** * return the multicast address of the detector - * - * @return + * + * @return */ - public InetAddress getAddress () + public InetAddress getAddress() { return addr; } /** * set the interface address of the multicast - * - * @param ip + * + * @param ip */ - public void setAddress (InetAddress ip) + public void setAddress(InetAddress ip) { this.addr = ip; } /** * return the bind address of the detector - * - * @return + * + * @return */ public InetAddress getBindAddress() { - return bindAddr; + return bindAddr; } /** * set the bind address of the multicast - * - * @param ip + * + * @param ip */ public void setBindAddress(InetAddress ip) { - this.bindAddr = ip; + this.bindAddr = ip; } /** * get the port that the detector is multicasting to - * - * @return + * + * @return */ - public int getPort () + public int getPort() { return port; } /** * set the port for detections to be multicast to - * - * @param port + * + * @param port */ - public void setPort (int port) + public void setPort(int port) { this.port = port; } /** * called by MBeanServer to start the mbean lifecycle - * - * @throws Exception + * + * @throws Exception */ - public void start () throws Exception + public void start() throws Exception { - if (addr==null) + if (addr == null) { this.addr = InetAddress.getByName(DEFAULT_IP); } // check to see if we're running on a machine with loopback and no NIC - InetAddress localHost = InetAddress.getLocalHost (); - if (bindAddr==null && localHost.getHostAddress().equals("127.0.0.1")) - { - // use this to bind so multicast will work w/o network - this.bindAddr = localHost; + InetAddress localHost = InetAddress.getLocalHost(); + if (bindAddr == null && localHost.getHostAddress().equals("127.0.0.1")) + { + // use this to bind so multicast will work w/o network + this.bindAddr = localHost; } identity = Identity.get(mbeanserver); socket = new MulticastSocket(port); if (bindAddr != null) - socket.setInterface(bindAddr); + socket.setInterface(bindAddr); socket.joinGroup(addr); - if (listener==null) + if (listener == null) { listener = new Listener(); } listener.start(); - super.start (); + super.start(); + if (log.isInfoEnabled()) + { + log.info("Multicast Detector listening on " + addr + ":" + port); + } } /** * called by the MBeanServer to stop the mbean lifecycle - * - * @throws Exception + * + * @throws Exception */ - public void stop () throws Exception + public void stop() throws Exception { - super.stop (); - listener.running=false; - listener.interrupt(); - listener=null; - socket.leaveGroup(addr); - socket.close(); - socket = null; + super.stop(); + if (listener != null) + { + listener.running = false; + listener.interrupt(); + listener = null; + } + if (socket != null) + { + socket.leaveGroup(addr); + socket.close(); + socket = null; + } + if (log.isInfoEnabled()) + { + log.info("Multicast Detector shutdown on " + addr + ":" + port); + } } /** * subclasses must implement to provide the specific heartbeat protocol * for this server to send out to other servers on the network */ - protected void heartbeat () + protected void heartbeat() { ServerInvoker invokers[] = InvokerRegistry.getServerInvokers(); - if (invokers==null || invokers.length<=0) + if (socket == null || invokers == null || invokers.length <= 0) { return; } List l = new ArrayList(invokers.length); - for (int c=0;c<invokers.length;c++) + for (int c = 0; c < invokers.length; c++) { if (invokers[c].isStarted()) { @@ -172,73 +188,102 @@ { return; } - InvokerLocator locators[]=(InvokerLocator[])l.toArray(new InvokerLocator[l.size()]); - if (socket!=null) + InvokerLocator locators[] = (InvokerLocator[]) l.toArray(new InvokerLocator[l.size()]); + if (socket != null) { - Detection msg=new Detection(Identity.get(mbeanserver),locators); - try - { - if (DETECTOR_DEBUG && log.isDebugEnabled()) - { - log.debug("sending heartbeat: "+msg); - } - byte buf[] = ClassUtil.serialize(msg); - DatagramPacket p = new DatagramPacket(buf, buf.length, addr, port); - socket.send(p); - } - catch (Throwable ex) - { - // its failed - log.debug("heartbeat failed",ex); - } + try + { + byte buf[] = null; + Detection msg = new Detection(Identity.get(mbeanserver), locators); + if (lastDetection != null && msg.equals(lastDetection)) + { + buf = lastDetectionBuf; + } + else + { + // serialize and then remember it so we don't serialize each time + buf = ClassUtil.serialize(msg); + lastDetection = msg; + lastDetectionBuf = buf; + } + if (DETECTOR_DEBUG && log.isDebugEnabled()) + { + log.debug("sending heartbeat: " + msg); + } + DatagramPacket p = new DatagramPacket(buf, buf.length, addr, port); + socket.send(p); + } + catch (Throwable ex) + { + if (isRunning()) + { + // its failed + log.debug("heartbeat failed", ex); + } + } } } private void listen(DatagramPacket p, byte[] buf) { - if (socket!=null) - { - try - { - // should block until we get a multicast - socket.receive(p); - - // take the multicast, and deserialize into the detection event - Detection msg = (Detection) ClassUtil.deserialize(buf); - if (DETECTOR_DEBUG && log.isDebugEnabled()) - log.debug("received detection: "+msg); - - // let the subclass do the hard work off handling detection - detect(msg); - } - catch (Throwable e) - { - if (e instanceof java.io.InvalidClassException) - { - return; - } - if (socket!=null) - { - log.debug("Error receiving detection",e); - } - } - } + if (socket != null && listener != null && listener.running) + { + try + { + // should block until we get a multicast + socket.receive(p); + + // take the multicast, and deserialize into the detection event + Detection msg = (Detection) ClassUtil.deserialize(buf); + if (DETECTOR_DEBUG && log.isDebugEnabled()) + log.debug("received detection: " + msg); + + // let the subclass do the hard work off handling detection + detect(msg); + } + catch (Throwable e) + { + if (e instanceof java.io.InvalidClassException) + { + return; + } + if (socket != null && listener != null && listener.running && isRunning()) + { + log.debug("Error receiving detection", e); + } + } + } } private final class Listener extends Thread { - boolean running = true; + boolean running = true; + + Listener() + { + super("MulticastDetector - Detection Receiver"); + } - public void run() - { - byte[] buf = new byte[4000]; - DatagramPacket p = new DatagramPacket(buf, 0, buf.length); - //p.setAddress(addr); - //p.setPort(port); - while (running) - { - listen(p, buf); - } - } - } + public void run() + { + byte[] buf = new byte[4000]; + DatagramPacket p = new DatagramPacket(buf, 0, buf.length); + while (isRunning() && + running && socket.isClosed() == false) + { + try + { + listen(p, buf); + } + catch (Exception ex) + { + if (!running && isRunning()==false) + { + break; + } + if (log.isDebugEnabled()) log.debug("exception received during listen", ex); + } + } + } + } } |