From: <sno...@us...> - 2013-09-23 23:42:32
|
Revision: 108 http://sourceforge.net/p/openrpg/svn/108 Author: snowdog_ Date: 2013-09-23 23:42:28 +0000 (Mon, 23 Sep 2013) Log Message: ----------- Adding heartbeat & disconnection monitoring to server. Discovered a bug in message address handling that is causing client addresses to not be included in message header info. Currently actively debugging. Modified Paths: -------------- trunk/src/openrpg2/common/core/network/NetworkClientModule.java trunk/src/openrpg2/common/core/network/NetworkServer.java Modified: trunk/src/openrpg2/common/core/network/NetworkClientModule.java =================================================================== --- trunk/src/openrpg2/common/core/network/NetworkClientModule.java 2013-09-23 23:21:49 UTC (rev 107) +++ trunk/src/openrpg2/common/core/network/NetworkClientModule.java 2013-09-23 23:42:28 UTC (rev 108) @@ -79,7 +79,7 @@ //NOTE messages of this type ALWAYS come from the server int operation = Integer.parseInt(msg.getHeader(HEADER_OP)); switch(operation){ - case(NetworkClientModule.OP_NOOP):{ break; } + case(NetworkClientModule.OP_NOOP):{ handleHeartbeatMessage(); break; } case(NetworkClientModule.OP_DISCONNECT):{ handleDisconnectMessage(msg);break; } case(NetworkClientModule.OP_ALERT):{ handleAlertMessage(msg); break; } case(NetworkClientModule.OP_SHUTDOWN):{ handleShutdownMessage(msg); break; } @@ -104,6 +104,26 @@ } } + /** + * The server will send a NOOP message when it thinks the connection is getting + * stale and it expects a response to update its connection timer(s) for the + * clients connection. This method simply forces a response to the server request + * with a NOOP message. + */ + private void handleHeartbeatMessage(){ + System.out.println("heartbeat requested!"); + ORPGMessage m = new ORPGMessage(); + m.setMessageType(ORPGConstants.TYPE_NETWORK); + m.setHeader(NetworkClientModule.HEADER_OP, NetworkClientModule.OP_NOOP); + m.setDestination( ORPGConstants.TO_SERVER ); + try { + this.modCom.sendToNetwork(m); + } catch (ModuleCommunicationException ex) { + Logger.getLogger(NetworkClientModule.class.getName()).log(Level.SEVERE, null, ex); + } + } + + private void handleDisconnectMessage(ORPGMessage msg){ //TODO: handle notification of rest of ORPG2 system when a client sends a disconnect message } Modified: trunk/src/openrpg2/common/core/network/NetworkServer.java =================================================================== --- trunk/src/openrpg2/common/core/network/NetworkServer.java 2013-09-23 23:21:49 UTC (rev 107) +++ trunk/src/openrpg2/common/core/network/NetworkServer.java 2013-09-23 23:42:28 UTC (rev 108) @@ -29,6 +29,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; +import java.util.Calendar; import java.util.Enumeration; import java.util.Hashtable; import java.util.Iterator; @@ -37,9 +38,12 @@ import java.util.Properties; import java.util.Set; import java.util.logging.Logger; +import openrpg2.common.core.ORPGConstants; import openrpg2.common.core.ORPGMessage; import openrpg2.common.core.ORPGMessageQueue; import openrpg2.common.core.ORPGSettingManager; +import openrpg2.common.core.route.AddressToken; +import openrpg2.common.core.route.MessageAddress; import openrpg2.common.module.NetworkedModule; @@ -47,8 +51,8 @@ * Network Server. This class represents the interface between all network * functions of the OpenRPG server and the remainder of the application. * This is intended to be a black-box object such that network operations are - * wholely contained and implemented within this object. All interaction with - * the network functions of OpenRPG should occure though this objects API. + * contained and implemented within this object. All interaction with + * the network functions of OpenRPG should occur though this objects API. * @author snowdog */ @@ -70,12 +74,16 @@ private Object monitorLock = new Object(); private NetworkConnectionAlerter connectionState = new NetworkConnectionAlerter(); private Logger log = Logger.getLogger(this.getClass().getName()); - + private long lastSweep = 0L; + static final int DEFAULT_THREAD_POOL_SIZE = 5; static final String SERVER_SETTINGS_FILENAME = "server.properties"; static final String SETTING_DEFAULT_IP = "defaultIP"; static final String SETTING_DEFAULT_PORT = "defaultPort"; private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE; + static final int NETWORK_TIMEOUT = 300; //5 mins + static final int NETWORK_NOOP_LIMIT = 20;//180; //3 mins + static final int NETWORK_SWEEP_LIMIT = 10;//limit to every 10 seconds /** @@ -101,7 +109,9 @@ } /** - * Gets the default ip and port to run the server on by checking for user perferences first and then falling back on the hard coded defaults if no preferences exist. + * Gets the default IP address and port to run the server on by checking + * for user preferences first and then falling back on the hard coded + * defaults if no preferences exist. * @return InetSocketAddress to run server on. */ public InetSocketAddress getDefaultAddress(){ @@ -203,7 +213,7 @@ */ private void delegateMessageDelivery(ORPGMessage m){ NetworkServiceThread nst=null; - + //get a message to handle and its associated recipient list int[] destId = m.getFinalRecipientList(); @@ -314,6 +324,8 @@ // after the 1 second delay on the selector readiness scan // // server should be considered "idle" in this situation + + performNetworkSweep(); //use idle time to cleanup dead connections and prevent zombies. } } log.finer("[DEBUG] Main select thread terminating"); @@ -321,9 +333,6 @@ }; serverThread.start(); startOutboundMonitor(); - - - } /** @@ -366,6 +375,7 @@ inQueue.putMessage(msg); } + @Override public Observable addMessageQueueObserver(Observer o){ inQueue.addObserver(o); return outQueue; //return reference to object for comparison only in case more than one observable is being monitored. @@ -402,7 +412,54 @@ return runFlag; } + /** + * This method performs a scans over the connected client sockets + * and forces the send of a network NOOP message (aka heartbeat) + * based on their last message received time. This catches clients + * who have crashed or otherwise severed the socket connection without + * properly informing the server of the shutdown. + */ + private void performNetworkSweep(){ + if( clients.isEmpty()){ return; }//nothing to do! + + //check last sweep time to make sure we don't over-sweep + long last = (Calendar.getInstance().getTimeInMillis() - this.lastSweep)/1000; + if( last < (long)NetworkServer.NETWORK_SWEEP_LIMIT){ return; } //not time to sweep yet, bailout + + System.out.println("Sweeping System!"); + Enumeration en = clients.keys(); + while(en.hasMoreElements()){ + Integer i = (Integer)en.nextElement(); + NetworkConnection n = (NetworkConnection)clients.get(i); + //if connection has no activity inbound for NETWORK_TIMEOUT seconds... disconnect the client. + int delta = n.stats.getInboundMessageDelta()/1000; + System.out.println("- Client "+n.getId()+" delta: "+delta); + + if( delta > NetworkServer.NETWORK_TIMEOUT){ + System.out.println("[DEBUG] SHOULD BE Disconnecting!! "+NetworkServer.NETWORK_TIMEOUT+" seconds"); + //n.disconnect(); + }else{ + if( delta >= NetworkServer.NETWORK_NOOP_LIMIT){ + //fabricate a network NOOP message and push it into the send queue + System.out.println("Sending NOOP to client "+n.getId()); + ORPGMessage m = new ORPGMessage(); + m.setMessageType(ORPGConstants.TYPE_NETWORK); + m.setHeader(NetworkClientModule.HEADER_OP, NetworkClientModule.OP_NOOP); + MessageAddress ma = new MessageAddress(); + ma.append(new AddressToken(n.getId())); + m.setDestination(ma); + System.out.println("NOOP MESSAGE: "+m.getAsString()); + sendMessage(m); + } + } + } + //reset sweep time + this.lastSweep = Calendar.getInstance().getTimeInMillis(); + } + + //methods to satisfy NetworkMessageRelay interface + @Override public boolean putORPGMessage(ORPGMessage msg){ sendMessage(msg); return true; @@ -412,6 +469,7 @@ * @throws openrpg2.common.core.network.NoMessageAvailableException Thrown if no messages are available for retrieval * @return ORPGMessage object */ + @Override public ORPGMessage getORPGMessage() throws NoMessageAvailableException{ ORPGMessage msg = inQueue.pullMessage(); if (msg == null){ throw new NoMessageAvailableException(); } @@ -421,6 +479,7 @@ * Check if ORPGMessage can be retrieved with getORPGMessage() method * @return True if 1 or more ORPGMessages are ready for retrieval otherwise false. */ + @Override public boolean hasORPGMessage(){ if (inQueue.hasRemaining()) return true; return false; @@ -431,6 +490,7 @@ * Allows other core components to be notified when a client connects or disconnects. * @param o An Observer Object to register for network state notification */ + @Override public void registerNetworkConnectionObserver(Observer o){ connectionState.addObserver(o); @@ -441,6 +501,7 @@ * Required for implementation of NetworkModuleProvider interface. * */ + @Override public NetworkedModule getNetworkModule(){ return new NetworkServerModule(this); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |