|
From: <sno...@us...> - 2013-09-23 23:42:32
|
Revision: 108
http://sourceforge.net/p/openrpg/svn/108
Author: snowdog_
Date: 2013-09-23 23:42:28 +0000 (Mon, 23 Sep 2013)
Log Message:
-----------
Adding heartbeat & disconnection monitoring to server. Discovered a bug in message address handling that is causing client addresses to not be included in message header info. Currently actively debugging.
Modified Paths:
--------------
trunk/src/openrpg2/common/core/network/NetworkClientModule.java
trunk/src/openrpg2/common/core/network/NetworkServer.java
Modified: trunk/src/openrpg2/common/core/network/NetworkClientModule.java
===================================================================
--- trunk/src/openrpg2/common/core/network/NetworkClientModule.java 2013-09-23 23:21:49 UTC (rev 107)
+++ trunk/src/openrpg2/common/core/network/NetworkClientModule.java 2013-09-23 23:42:28 UTC (rev 108)
@@ -79,7 +79,7 @@
//NOTE messages of this type ALWAYS come from the server
int operation = Integer.parseInt(msg.getHeader(HEADER_OP));
switch(operation){
- case(NetworkClientModule.OP_NOOP):{ break; }
+ case(NetworkClientModule.OP_NOOP):{ handleHeartbeatMessage(); break; }
case(NetworkClientModule.OP_DISCONNECT):{ handleDisconnectMessage(msg);break; }
case(NetworkClientModule.OP_ALERT):{ handleAlertMessage(msg); break; }
case(NetworkClientModule.OP_SHUTDOWN):{ handleShutdownMessage(msg); break; }
@@ -104,6 +104,26 @@
}
}
+ /**
+ * The server will send a NOOP message when it thinks the connection is getting
+ * stale and it expects a response to update its connection timer(s) for the
+ * clients connection. This method simply forces a response to the server request
+ * with a NOOP message.
+ */
+ private void handleHeartbeatMessage(){
+ System.out.println("heartbeat requested!");
+ ORPGMessage m = new ORPGMessage();
+ m.setMessageType(ORPGConstants.TYPE_NETWORK);
+ m.setHeader(NetworkClientModule.HEADER_OP, NetworkClientModule.OP_NOOP);
+ m.setDestination( ORPGConstants.TO_SERVER );
+ try {
+ this.modCom.sendToNetwork(m);
+ } catch (ModuleCommunicationException ex) {
+ Logger.getLogger(NetworkClientModule.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ }
+
+
private void handleDisconnectMessage(ORPGMessage msg){
//TODO: handle notification of rest of ORPG2 system when a client sends a disconnect message
}
Modified: trunk/src/openrpg2/common/core/network/NetworkServer.java
===================================================================
--- trunk/src/openrpg2/common/core/network/NetworkServer.java 2013-09-23 23:21:49 UTC (rev 107)
+++ trunk/src/openrpg2/common/core/network/NetworkServer.java 2013-09-23 23:42:28 UTC (rev 108)
@@ -29,6 +29,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
+import java.util.Calendar;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
@@ -37,9 +38,12 @@
import java.util.Properties;
import java.util.Set;
import java.util.logging.Logger;
+import openrpg2.common.core.ORPGConstants;
import openrpg2.common.core.ORPGMessage;
import openrpg2.common.core.ORPGMessageQueue;
import openrpg2.common.core.ORPGSettingManager;
+import openrpg2.common.core.route.AddressToken;
+import openrpg2.common.core.route.MessageAddress;
import openrpg2.common.module.NetworkedModule;
@@ -47,8 +51,8 @@
* Network Server. This class represents the interface between all network
* functions of the OpenRPG server and the remainder of the application.
* This is intended to be a black-box object such that network operations are
- * wholely contained and implemented within this object. All interaction with
- * the network functions of OpenRPG should occure though this objects API.
+ * contained and implemented within this object. All interaction with
+ * the network functions of OpenRPG should occur though this objects API.
* @author snowdog
*/
@@ -70,12 +74,16 @@
private Object monitorLock = new Object();
private NetworkConnectionAlerter connectionState = new NetworkConnectionAlerter();
private Logger log = Logger.getLogger(this.getClass().getName());
-
+ private long lastSweep = 0L;
+
static final int DEFAULT_THREAD_POOL_SIZE = 5;
static final String SERVER_SETTINGS_FILENAME = "server.properties";
static final String SETTING_DEFAULT_IP = "defaultIP";
static final String SETTING_DEFAULT_PORT = "defaultPort";
private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;
+ static final int NETWORK_TIMEOUT = 300; //5 mins
+ static final int NETWORK_NOOP_LIMIT = 20;//180; //3 mins
+ static final int NETWORK_SWEEP_LIMIT = 10;//limit to every 10 seconds
/**
@@ -101,7 +109,9 @@
}
/**
- * Gets the default ip and port to run the server on by checking for user perferences first and then falling back on the hard coded defaults if no preferences exist.
+ * Gets the default IP address and port to run the server on by checking
+ * for user preferences first and then falling back on the hard coded
+ * defaults if no preferences exist.
* @return InetSocketAddress to run server on.
*/
public InetSocketAddress getDefaultAddress(){
@@ -203,7 +213,7 @@
*/
private void delegateMessageDelivery(ORPGMessage m){
NetworkServiceThread nst=null;
-
+
//get a message to handle and its associated recipient list
int[] destId = m.getFinalRecipientList();
@@ -314,6 +324,8 @@
// after the 1 second delay on the selector readiness scan
//
// server should be considered "idle" in this situation
+
+ performNetworkSweep(); //use idle time to cleanup dead connections and prevent zombies.
}
}
log.finer("[DEBUG] Main select thread terminating");
@@ -321,9 +333,6 @@
};
serverThread.start();
startOutboundMonitor();
-
-
-
}
/**
@@ -366,6 +375,7 @@
inQueue.putMessage(msg);
}
+ @Override
public Observable addMessageQueueObserver(Observer o){
inQueue.addObserver(o);
return outQueue; //return reference to object for comparison only in case more than one observable is being monitored.
@@ -402,7 +412,54 @@
return runFlag;
}
+ /**
+ * This method performs a scans over the connected client sockets
+ * and forces the send of a network NOOP message (aka heartbeat)
+ * based on their last message received time. This catches clients
+ * who have crashed or otherwise severed the socket connection without
+ * properly informing the server of the shutdown.
+ */
+ private void performNetworkSweep(){
+ if( clients.isEmpty()){ return; }//nothing to do!
+
+ //check last sweep time to make sure we don't over-sweep
+ long last = (Calendar.getInstance().getTimeInMillis() - this.lastSweep)/1000;
+ if( last < (long)NetworkServer.NETWORK_SWEEP_LIMIT){ return; } //not time to sweep yet, bailout
+
+ System.out.println("Sweeping System!");
+ Enumeration en = clients.keys();
+ while(en.hasMoreElements()){
+ Integer i = (Integer)en.nextElement();
+ NetworkConnection n = (NetworkConnection)clients.get(i);
+ //if connection has no activity inbound for NETWORK_TIMEOUT seconds... disconnect the client.
+ int delta = n.stats.getInboundMessageDelta()/1000;
+ System.out.println("- Client "+n.getId()+" delta: "+delta);
+
+ if( delta > NetworkServer.NETWORK_TIMEOUT){
+ System.out.println("[DEBUG] SHOULD BE Disconnecting!! "+NetworkServer.NETWORK_TIMEOUT+" seconds");
+ //n.disconnect();
+ }else{
+ if( delta >= NetworkServer.NETWORK_NOOP_LIMIT){
+ //fabricate a network NOOP message and push it into the send queue
+ System.out.println("Sending NOOP to client "+n.getId());
+ ORPGMessage m = new ORPGMessage();
+ m.setMessageType(ORPGConstants.TYPE_NETWORK);
+ m.setHeader(NetworkClientModule.HEADER_OP, NetworkClientModule.OP_NOOP);
+ MessageAddress ma = new MessageAddress();
+ ma.append(new AddressToken(n.getId()));
+ m.setDestination(ma);
+ System.out.println("NOOP MESSAGE: "+m.getAsString());
+ sendMessage(m);
+ }
+ }
+ }
+ //reset sweep time
+ this.lastSweep = Calendar.getInstance().getTimeInMillis();
+ }
+
+
//methods to satisfy NetworkMessageRelay interface
+ @Override
public boolean putORPGMessage(ORPGMessage msg){
sendMessage(msg);
return true;
@@ -412,6 +469,7 @@
* @throws openrpg2.common.core.network.NoMessageAvailableException Thrown if no messages are available for retrieval
* @return ORPGMessage object
*/
+ @Override
public ORPGMessage getORPGMessage() throws NoMessageAvailableException{
ORPGMessage msg = inQueue.pullMessage();
if (msg == null){ throw new NoMessageAvailableException(); }
@@ -421,6 +479,7 @@
* Check if ORPGMessage can be retrieved with getORPGMessage() method
* @return True if 1 or more ORPGMessages are ready for retrieval otherwise false.
*/
+ @Override
public boolean hasORPGMessage(){
if (inQueue.hasRemaining()) return true;
return false;
@@ -431,6 +490,7 @@
* Allows other core components to be notified when a client connects or disconnects.
* @param o An Observer Object to register for network state notification
*/
+ @Override
public void registerNetworkConnectionObserver(Observer o){
connectionState.addObserver(o);
@@ -441,6 +501,7 @@
* Required for implementation of NetworkModuleProvider interface.
*
*/
+ @Override
public NetworkedModule getNetworkModule(){
return new NetworkServerModule(this);
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|