From: Hendrik B. <nh...@us...> - 2006-08-24 22:28:11
|
Update of /cvsroot/arianne/marauroa/src/marauroa/server/net In directory sc8-pr-cvs11.sourceforge.net:/tmp/cvs-serv16540/src/marauroa/server/net Modified Files: NetworkServerManagerCallback.java NetworkServerManager.java TCPWriter.java TCPReader.java Log Message: porting bugfix for socket.writer blocking and killing the server from branch MARAUROA_VERSION_01_RELEASE_32 to HEAD Index: TCPWriter.java =================================================================== RCS file: /cvsroot/arianne/marauroa/src/marauroa/server/net/TCPWriter.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** TCPWriter.java 20 Aug 2006 15:40:13 -0000 1.8 --- TCPWriter.java 24 Aug 2006 22:27:48 -0000 1.9 *************** *** 6,12 **** --- 6,17 ---- import java.net.InetSocketAddress; import java.net.Socket; + import java.util.Collections; + import java.util.Iterator; + import java.util.LinkedList; + import java.util.List; import marauroa.common.CRC; import marauroa.common.Log4J; + import marauroa.common.Pair; import marauroa.common.net.Message; import marauroa.common.net.OutputSerializer; *************** *** 18,38 **** class TCPWriter { private static Logger logger = Logger.getLogger(TCPWriter.class); ! ! private NetworkServerManagerCallback networkServerManager = null; ! private Statistics stats = null; /** * Creates a NetworkServerManagerWrite ! * ! * @param networkServerManager ! * NetworkServerManager ! * @param stats ! * Statistics */ ! public TCPWriter(NetworkServerManagerCallback networkServerManager, ! Statistics stats) { ! this.networkServerManager = networkServerManager; this.stats = stats; } --- 23,42 ---- class TCPWriter { private static Logger logger = Logger.getLogger(TCPWriter.class); ! private NetworkServerManagerCallback networkServerManagerCallback = null; private Statistics stats = null; + private List<Pair<Socket, byte[]>> queue = null; /** * Creates a NetworkServerManagerWrite ! * ! * @param networkServerManager NetworkServerManager ! * @param stats Statistics */ ! public TCPWriter(NetworkServerManagerCallback networkServerManager, Statistics stats) { ! this.networkServerManagerCallback = networkServerManager; this.stats = stats; + this.queue = Collections.synchronizedList(new LinkedList<Pair<Socket, byte[]>>()); + TCPWriterTimeoutThread tcpWriterTimeoutThread = new TCPWriterTimeoutThread(networkServerManagerCallback, queue); + tcpWriterTimeoutThread.start(); } *************** *** 46,82 **** final private int PACKET_SIGNATURE_SIZE = 4; - final private int PACKET_LENGTH_SIZE = 4; /** * Method that execute the writting ! * ! * @param msg ! * Message to write ! * @param socket ! * Socket */ public void write(Message msg, Socket socket) { Log4J.startMethod(logger, "write"); ! long timeStart = System.currentTimeMillis(); try { ! if (networkServerManager.isStillRunning()) { byte[] buffer = serializeMessage(msg); short used_signature; ! /** * Statistics ** */ ! used_signature = CRC.cmpCRC(buffer); // ++last_signature; stats.add("Bytes send", buffer.length); stats.add("Message send", 1); ! logger.debug("Message(" + msg.getType() + ") size in bytes: " ! + buffer.length); ! byte[] data = new byte[PACKET_LENGTH_SIZE ! + PACKET_SIGNATURE_SIZE + buffer.length]; int size = buffer.length + PACKET_SIGNATURE_SIZE; data[0] = (byte) (size & 255); ! data[1] = (byte) ((size >> 8) & 255); data[2] = (byte) ((size >> 16) & 255); data[3] = (byte) ((size >> 24) & 255); --- 50,81 ---- final private int PACKET_SIGNATURE_SIZE = 4; final private int PACKET_LENGTH_SIZE = 4; /** * Method that execute the writting ! * ! * @param msg Message to write ! * @param socket Socket */ public void write(Message msg, Socket socket) { Log4J.startMethod(logger, "write"); ! long timeStart = System.currentTimeMillis(); try { ! if (networkServerManagerCallback.isStillRunning()) { byte[] buffer = serializeMessage(msg); short used_signature; ! /*** Statistics ***/ ! used_signature = CRC.cmpCRC(buffer); //++last_signature; stats.add("Bytes send", buffer.length); stats.add("Message send", 1); ! logger.debug("Message(" + msg.getType() + ") size in bytes: " + buffer.length); ! byte[] data = new byte[PACKET_LENGTH_SIZE + PACKET_SIGNATURE_SIZE + buffer.length]; int size = buffer.length + PACKET_SIGNATURE_SIZE; data[0] = (byte) (size & 255); ! data[1] = (byte) ((size >> 8) & 255); data[2] = (byte) ((size >> 16) & 255); data[3] = (byte) ((size >> 24) & 255); *************** *** 89,117 **** // don't use multiple os.write calls because we have // disabled Nagel's algorithm. ! System.arraycopy(buffer, 0, data, PACKET_LENGTH_SIZE ! + PACKET_SIGNATURE_SIZE, buffer.length); ! ! OutputStream os = socket.getOutputStream(); ! os.write(data); ! os.flush(); ! logger.debug("Sent packet(" + used_signature + ") " ! + buffer.length); ! if (logger.isDebugEnabled()) { ! logger.debug("Sent message: " + msg); } } Log4J.finishMethod(logger, "write"); ! } catch (IOException e) { /* Report the exception */ logger.info("error while sending a packet (msg=(" + msg + "))", e); ! networkServerManager.disconnectClient(new InetSocketAddress(socket ! .getInetAddress(), socket.getPort())); } ! long timeEnd = System.currentTimeMillis(); ! if (timeEnd - timeStart > 1000) { ! logger.warn("TCPWriter.write took " + (timeEnd - timeStart) + " (" ! + socket.getInetAddress() + ")"); } } } --- 88,273 ---- // don't use multiple os.write calls because we have // disabled Nagel's algorithm. ! System.arraycopy(buffer, 0, data, PACKET_LENGTH_SIZE + PACKET_SIGNATURE_SIZE, buffer.length); ! synchronized (queue) { ! queue.add(new Pair(socket, data)); ! } ! logger.debug("Sending packet(" + used_signature + ") " + buffer.length); if (logger.isDebugEnabled()) { ! logger.debug("Sending message: " + msg); } } Log4J.finishMethod(logger, "write"); ! } catch (Exception e) { /* Report the exception */ logger.info("error while sending a packet (msg=(" + msg + "))", e); ! networkServerManagerCallback.internalDisconnectClientNow(new InetSocketAddress(socket.getInetAddress(), socket.getPort())); } ! long timeEnd = System.currentTimeMillis(); ! if (timeEnd - timeStart > 1000) { ! logger.warn("TCPWriter.write took " + (timeEnd - timeStart) + " (" + socket.getInetAddress() + ")"); ! } ! } ! ! /** ! * Thread, which writes the data. Because of some undocumented reason ! * socket.getOutputStream().write() can block so we put it in its own ! * thread and have a second one to monitor and restart this. ! */ ! public static class TCPWriterThread extends Thread { ! private Logger logger = Logger.getLogger(TCPWriterThread.class); ! private NetworkServerManagerCallback networkServerManagerCallback = null; ! private List<Pair<Socket, byte[]>> queue = null; ! private long watchdog = System.currentTimeMillis(); ! private Socket possibleBadSocket = null; ! private boolean keepRunning = true; ! ! /** ! * Creates a TCPWriterThread ! * ! * @param networkServerManagerCallback NetworkServerManagerCallback ! * @param queue event queue ! */ ! public TCPWriterThread (NetworkServerManagerCallback networkServerManagerCallback, List<Pair<Socket, byte[]>> queue) { ! super("TCPWriterThread"); ! super.setDaemon(true); ! ! this.networkServerManagerCallback = networkServerManagerCallback; ! this.queue = queue; ! } ! ! /** ! * Returns the timestamp this thread was last seen alive. ! * ! * @return watchdog ! */ ! public long getWatchdogTime() { ! return watchdog; ! } ! ! /** ! * Removes the possibl-bad-socket from the queue and closes it. ! */ ! @SuppressWarnings("cast") ! public void kill() { ! Socket mySocket = possibleBadSocket; ! if (mySocket != null) { ! logger.error("Killing " + mySocket.getInetAddress()); ! networkServerManagerCallback.internalDisconnectClientNow(new InetSocketAddress(mySocket.getInetAddress(), mySocket.getPort())); ! synchronized (queue) { ! Iterator<Pair<Socket, byte[]>> itr = queue.iterator(); ! while (itr.hasNext()) { ! Pair<Socket, byte[]> pair = (Pair<Socket, byte[]>) itr.next(); ! if (pair.first().equals(mySocket)) { ! itr.remove(); ! } ! } ! } ! } ! keepRunning = false; ! } ! ! @Override ! public void run() { ! while (keepRunning) { ! watchdog = System.currentTimeMillis(); ! ! // try to remove the first element ! Pair<Socket, byte[]> pair = null; ! synchronized (queue) { ! if (queue.size() > 0) { ! pair = queue.remove(0); ! } ! } ! ! // if we removed an item from the queue, we will process it here ! if (pair != null) { ! try { ! possibleBadSocket = pair.first(); ! OutputStream os = pair.first().getOutputStream(); ! os.write(pair.second()); ! os.flush(); ! possibleBadSocket = null; ! } catch (Exception e) { ! /* Report the exception */ ! logger.info("error while sending a packet (to " + pair.first().getInetAddress() + ")", e); ! networkServerManagerCallback.internalDisconnectClientNow(new InetSocketAddress(pair.first().getInetAddress(), pair.first().getPort())); ! } ! ! } else { ! // otherwise we wait ! networkServerManagerCallback.internalDisconnectClientsNow(); ! ! try { ! Thread.sleep(50); ! } catch (InterruptedException e) { ! logger.error(e, e); ! } ! } ! } } } + + + + /** + * Thread, which writes the data. Because of some undocumented reason + * socket.getOutputStream().write() can block so we put it in its own + * thread and have a second one to monitor and restart this. + */ + public static class TCPWriterTimeoutThread extends Thread { + private static Logger logger = Logger.getLogger(TCPWriterThread.class); + private NetworkServerManagerCallback networkServerManagerCallback = null; + private List<Pair<Socket, byte[]>> queue = null; + private TCPWriterThread tcpWriterThread = null; + + /** + * Creates a TCPWriterTimeoutThread + * + * @param networkServerManagerCallback NetworkServerManagerCallback + * @param queue event queue + */ + public TCPWriterTimeoutThread (NetworkServerManagerCallback networkServerManagerCallback, List<Pair<Socket, byte[]>> queue) { + super("TCPWriterTimeoutThread"); + super.setDaemon(true); + + this.networkServerManagerCallback = networkServerManagerCallback; + this.queue = queue; + } + + private void createTCPWriterThread () { + tcpWriterThread = new TCPWriterThread(networkServerManagerCallback, queue); + tcpWriterThread.start(); + } + + @Override + public void start() { + createTCPWriterThread(); + super.start(); + } + + @Override + public void run() { + while (true) { + if (System.currentTimeMillis() - tcpWriterThread.getWatchdogTime() >= 2000) { + logger.error("TCPWriterThread is not responding, killing and restarting it"); + tcpWriterThread.kill(); + try { + // i am not sure if this works because the blocking is in native code + tcpWriterThread.stop(); + } catch (Exception e) { + logger.warn(e, e); + } + createTCPWriterThread(); + } + + // Wait + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error(e, e); + } + } + } + } + } Index: TCPReader.java =================================================================== RCS file: /cvsroot/arianne/marauroa/src/marauroa/server/net/TCPReader.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** TCPReader.java 20 Aug 2006 15:40:13 -0000 1.9 --- TCPReader.java 24 Aug 2006 22:27:48 -0000 1.10 *************** *** 16,45 **** import org.apache.log4j.Logger; ! /** * The active thread in charge of recieving messages from the network. */ class TCPReader extends Thread { private static Logger logger = Logger.getLogger(TCPReader.class); - private NetworkServerManagerCallback networkServerManager = null; ! ! private HashMap<InetSocketAddress, Socket> tcpSockets = null; ! private Map<Socket, Integer> bytesToRead = new WeakHashMap<Socket, Integer>(); - private Statistics stats = null; /** * Creates a NetworkServerManagerRead ! * ! * @param networkServerManager ! * NetworkServerManager ! * @param tcpSockets ! * communication end-points ! * @param stats ! * Statistics */ ! public TCPReader(NetworkServerManagerCallback networkServerManager, ! HashMap<InetSocketAddress, Socket> tcpSockets, Statistics stats) { super("TCPReader"); this.networkServerManager = networkServerManager; --- 16,38 ---- import org.apache.log4j.Logger; ! /** * The active thread in charge of recieving messages from the network. */ class TCPReader extends Thread { private static Logger logger = Logger.getLogger(TCPReader.class); private NetworkServerManagerCallback networkServerManager = null; ! ! private Map<InetSocketAddress, Socket> tcpSockets = null; private Map<Socket, Integer> bytesToRead = new WeakHashMap<Socket, Integer>(); private Statistics stats = null; /** * Creates a NetworkServerManagerRead ! * ! * @param networkServerManager NetworkServerManager ! * @param tcpSockets communication end-points ! * @param stats Statistics */ ! public TCPReader(NetworkServerManagerCallback networkServerManager, Map<InetSocketAddress, Socket> tcpSockets, Statistics stats) { super("TCPReader"); this.networkServerManager = networkServerManager; *************** *** 48,52 **** } ! /** * Method that execute the reading. It runs as a active thread forever. */ --- 41,45 ---- } ! /** * Method that execute the reading. It runs as a active thread forever. */ *************** *** 55,77 **** logger.debug("run()"); byte[] sizebuffer = new byte[4]; ! while (networkServerManager.isStillRunning()) { boolean found = false; long start = System.currentTimeMillis(); ! Map<InetSocketAddress, Socket> temptTcpSockets = (Map<InetSocketAddress, Socket>) tcpSockets ! .clone(); for (InetSocketAddress inetSocketAddress : temptTcpSockets.keySet()) { Socket socket = temptTcpSockets.get(inetSocketAddress); if (socket.isClosed()) { ! networkServerManager.disconnectClient(inetSocketAddress); continue; } if (!socket.isConnected()) { ! networkServerManager.disconnectClient(inetSocketAddress); continue; } try { InputStream is = socket.getInputStream(); ! Integer toReadInt = bytesToRead.get(socket); int size = -1; --- 48,70 ---- logger.debug("run()"); byte[] sizebuffer = new byte[4]; ! while (networkServerManager.isStillRunning()) { boolean found = false; long start = System.currentTimeMillis(); ! // clone tcpSockets so that we do not lock it to long ! Map<InetSocketAddress, Socket> temptTcpSockets = cloneMap(tcpSockets); for (InetSocketAddress inetSocketAddress : temptTcpSockets.keySet()) { Socket socket = temptTcpSockets.get(inetSocketAddress); if (socket.isClosed()) { ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); continue; } if (!socket.isConnected()) { ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); continue; } try { InputStream is = socket.getInputStream(); ! Integer toReadInt = bytesToRead.get(socket); int size = -1; *************** *** 81,102 **** is.read(sizebuffer); size = (sizebuffer[0] & 0xFF) ! + ((sizebuffer[1] & 0xFF) << 8) ! + ((sizebuffer[2] & 0xFF) << 16) ! + ((sizebuffer[3] & 0xFF) << 24); if (size == 542393671) { // "GET " ! // This request was not created by the ! // marauroa-client ! // but it was created by the HTTP-client to ! // compare // version numbers. ==> Close it. OutputStream os = socket.getOutputStream(); ! os.write("500 This is not a webserver\r\n\r\n" ! .getBytes()); os.flush(); os.close(); ! logger ! .warn("Closing connection because packet-size is magic-number \"GET \"."); ! networkServerManager ! .disconnectClient(inetSocketAddress); continue; } --- 74,90 ---- is.read(sizebuffer); size = (sizebuffer[0] & 0xFF) ! + ((sizebuffer[1] & 0xFF) << 8) ! + ((sizebuffer[2] & 0xFF) << 16) ! + ((sizebuffer[3] & 0xFF) << 24); if (size == 542393671) { // "GET " ! // This request was not created by the marauroa-client ! // but it was created by the HTTP-client to compare // version numbers. ==> Close it. OutputStream os = socket.getOutputStream(); ! os.write("500 This is not a webserver\r\n\r\n".getBytes()); os.flush(); os.close(); ! logger.warn("Closing connection because packet-size is magic-number \"GET \"."); ! networkServerManager.disconnectClient(inetSocketAddress); continue; } *************** *** 113,139 **** is.read(buffer); logger.debug("Received TCP Packet"); ! ! /** * Statistics ** */ stats.add("Bytes recv", size); stats.add("Message recv", 1); ! ! networkServerManager.receiveMessage(buffer, ! inetSocketAddress); bytesToRead.remove(socket); } } catch (SocketTimeoutException e) { logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.disconnectClient(inetSocketAddress); ! } catch (SocketException e) { ! logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.disconnectClient(inetSocketAddress); } catch (IOException e) { logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.disconnectClient(inetSocketAddress); } catch (Exception e) { /* Report the exception */ ! logger.error("error while processing tcp-packets (" ! + socket.getInetAddress() + ")", e); ! networkServerManager.disconnectClient(inetSocketAddress); } } --- 101,125 ---- is.read(buffer); logger.debug("Received TCP Packet"); ! ! /*** Statistics ***/ stats.add("Bytes recv", size); stats.add("Message recv", 1); ! ! networkServerManager.receiveMessage(buffer, inetSocketAddress); bytesToRead.remove(socket); } } catch (SocketTimeoutException e) { logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); ! } catch (SocketException e) { ! logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); } catch (IOException e) { logger.warn(e + " (" + socket.getInetAddress() + ")", e); ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); } catch (Exception e) { /* Report the exception */ ! logger.error("error while processing tcp-packets (" + socket.getInetAddress() + ")", e); ! networkServerManager.internalDisconnectClientNow(inetSocketAddress); } } *************** *** 147,153 **** } else { wait = wait * -1; ! logger ! .warn("Turn duration overflow by " + wait ! + " ms"); } } catch (InterruptedException e) { --- 133,137 ---- } else { wait = wait * -1; ! logger.warn("Turn duration overflow by " + wait + " ms"); } } catch (InterruptedException e) { *************** *** 160,162 **** --- 144,163 ---- logger.debug("run() finished"); } + + /** + * Clones a map. We cannot use .clone() because synchronized maps + * do not implement Cloneable :-/ + * + * @param map map to clone + * @return cloned map + */ + private Map<InetSocketAddress, Socket> cloneMap(Map<InetSocketAddress, Socket> map) { + Map<InetSocketAddress, Socket> res = new HashMap<InetSocketAddress, Socket>(); + synchronized (map) { + for (InetSocketAddress key : map.keySet()) { + res.put(key, map.get(key)); + } + } + return res; + } } Index: NetworkServerManagerCallback.java =================================================================== RCS file: /cvsroot/arianne/marauroa/src/marauroa/server/net/NetworkServerManagerCallback.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** NetworkServerManagerCallback.java 20 Aug 2006 15:40:13 -0000 1.4 --- NetworkServerManagerCallback.java 24 Aug 2006 22:27:48 -0000 1.5 *************** *** 5,11 **** /** ! * Callback interface into NetworkManagerServer to prevent dependency loop ! * between NetworkManagerServer and ...Writer/Reader. ! * * @author hendrik */ --- 5,11 ---- /** ! * Callback interface into NetworkManagerServer to prevent ! * dependency loop between NetworkManagerServer and ...Writer/Reader. ! * * @author hendrik */ *************** *** 14,22 **** /** * Are we still running? ! * * @return keepRunning */ public boolean isStillRunning(); ! /** * Notifies the NetworkServerManager that the read-thread has finished. --- 14,22 ---- /** * Are we still running? ! * * @return keepRunning */ public boolean isStillRunning(); ! /** * Notifies the NetworkServerManager that the read-thread has finished. *************** *** 26,46 **** /** * Receives a Message ! * ! * @param data ! * data ! * @param inetSocketAddress ! * the address of the client socket (ip+port) ! * @throws IOException ! * on an io-error. */ ! public void receiveMessage(byte[] data, InetSocketAddress inetSocketAddress) ! throws IOException; /** * Disconnect a client freeing associated resources ! * ! * @param inetSocketAddress ! * InetSocketAddress */ public void disconnectClient(InetSocketAddress inetSocketAddress); } --- 26,53 ---- /** * Receives a Message ! * ! * @param data data ! * @param inetSocketAddress the address of the client socket (ip+port) ! * @throws IOException on an io-error. */ ! public void receiveMessage(byte[] data, InetSocketAddress inetSocketAddress) throws IOException; /** * Disconnect a client freeing associated resources ! * ! * @param inetSocketAddress InetSocketAddress */ public void disconnectClient(InetSocketAddress inetSocketAddress); + + /** + * Disconnects a client freeing associated resources.<!--sic--> This method is used internally by TCPWriter. + * + * @param inetSocketAddress InetSocketAddress + */ + public void internalDisconnectClientNow(InetSocketAddress inetSocketAddress); + + /** + * Disconnects all clients waiting for disconnecting + */ + public void internalDisconnectClientsNow(); } Index: NetworkServerManager.java =================================================================== RCS file: /cvsroot/arianne/marauroa/src/marauroa/server/net/NetworkServerManager.java,v retrieving revision 1.28 retrieving revision 1.29 diff -C2 -d -r1.28 -r1.29 *** NetworkServerManager.java 20 Aug 2006 15:40:13 -0000 1.28 --- NetworkServerManager.java 24 Aug 2006 22:27:48 -0000 1.29 *************** *** 23,26 **** --- 23,27 ---- import java.util.LinkedList; import java.util.List; + import java.util.Map; import marauroa.common.Log4J; *************** *** 34,46 **** import org.apache.log4j.Logger; ! /** ! * The NetworkServerManager is the active entity of the marauroa.net package, it ! * is in charge of sending and recieving the packages from the network. ! */ ! public final class NetworkServerManager implements ! NetworkServerManagerCallback, Runnable { /** the logger instance. */ ! private static final Logger logger = Log4J ! .getLogger(NetworkServerManager.class); /** The server socket from where we recieve the packets. */ --- 35,44 ---- import org.apache.log4j.Logger; ! ! /** The NetworkServerManager is the active entity of the marauroa.net package, ! * it is in charge of sending and recieving the packages from the network. */ ! public final class NetworkServerManager implements NetworkServerManagerCallback, Runnable { /** the logger instance. */ ! private static final Logger logger = Log4J.getLogger(NetworkServerManager.class); /** The server socket from where we recieve the packets. */ *************** *** 59,70 **** private MessageFactory msgFactory; ! private HashMap<InetSocketAddress, Socket> tcpSockets = new HashMap<InetSocketAddress, Socket>(); private UDPReader udpReader; - private UDPWriter udpWriter; ! private TCPWriter tcpWriter; - private TCPReader tcpReader; --- 57,66 ---- private MessageFactory msgFactory; ! private Map<InetSocketAddress, Socket> tcpSockets = null; private UDPReader udpReader; private UDPWriter udpWriter; ! private TCPWriter tcpWriter; private TCPReader tcpReader; *************** *** 75,91 **** PacketValidator packetValidator; ! /** ! * Constructor that opens the socket on the marauroa_PORT and start the ! * thread to recieve new messages from the network. ! * ! * @throws SocketException ! * if the server socket cannot be created or bound. */ public NetworkServerManager() throws SocketException { Log4J.startMethod(logger, "NetworkServerManager"); ! /* ! * init the packet validater (which can now only check if the address is ! * banned) ! */ packetValidator = new PacketValidator(); msgFactory = MessageFactory.getFactory(); --- 71,90 ---- PacketValidator packetValidator; ! /** Used to close Sockets after the logout message has been transmitted */ ! private List<InetSocketAddress> toClose = null; ! ! /** ! * Constructor that opens the socket on the marauroa_PORT and start the thread ! * to recieve new messages from the network. ! * ! * @throws SocketException if the server socket cannot be created or bound. */ public NetworkServerManager() throws SocketException { Log4J.startMethod(logger, "NetworkServerManager"); ! ! this.tcpSockets = Collections.synchronizedMap(new HashMap<InetSocketAddress, Socket>()); ! this.toClose = Collections.synchronizedList(new LinkedList<InetSocketAddress>()); ! ! /* init the packet validater (which can now only check if the address is banned)*/ packetValidator = new PacketValidator(); msgFactory = MessageFactory.getFactory(); *************** *** 106,114 **** tcpListener.setDaemon(true); tcpListener.start(); ! ! /* ! * Because we access the list from several places we create a ! * synchronized list. ! */ messages = Collections.synchronizedList(new LinkedList<Message>()); stats = Statistics.getStatistics(); --- 105,110 ---- tcpListener.setDaemon(true); tcpListener.start(); ! ! /* Because we access the list from several places we create a synchronized list. */ messages = Collections.synchronizedList(new LinkedList<Message>()); stats = Statistics.getStatistics(); *************** *** 122,126 **** } ! /** * This method notify the thread to finish it execution */ --- 118,122 ---- } ! /** * This method notify the thread to finish it execution */ *************** *** 136,140 **** } ! /** * This methods notifies waiting threads to continue */ --- 132,136 ---- } ! /** * This methods notifies waiting threads to continue */ *************** *** 143,152 **** } ! /** ! * This method returns a Message from the list or block for timeout ! * milliseconds until a message is available or null if timeout happens. ! * ! * @param timeout ! * timeout time in milliseconds * @return a Message or null if timeout happens */ --- 139,147 ---- } ! /** ! * This method returns a Message from the list or block for timeout milliseconds ! * until a message is available or null if timeout happens. ! * ! * @param timeout timeout time in milliseconds * @return a Message or null if timeout happens */ *************** *** 173,179 **** } ! /** * This method blocks until a message is available ! * * @return a Message */ --- 168,174 ---- } ! /** * This method blocks until a message is available ! * * @return a Message */ *************** *** 194,206 **** /** * Puts a message received by the Networklayer into the list of messages. ! * ! * @param data ! * of message that was received ! * @param inetSocketAddress ! * the address of the client-socket (ip+port) */ ! public void receiveMessage(byte[] data, InetSocketAddress inetSocketAddress) ! throws IOException { ! if (!packetValidator.checkBanned(inetSocketAddress.getAddress())) { try { --- 189,198 ---- /** * Puts a message received by the Networklayer into the list of messages. ! * ! * @param data of message that was received ! * @param inetSocketAddress the address of the client-socket (ip+port) */ ! public void receiveMessage(byte[] data, InetSocketAddress inetSocketAddress) throws IOException { ! if (!packetValidator.checkBanned(inetSocketAddress.getAddress())) { try { *************** *** 211,236 **** } catch (InvalidVersionException e) { stats.add("Message invalid version", 1); ! MessageS2CInvalidMessage msg = new MessageS2CInvalidMessage( ! inetSocketAddress, ! "Invalid client version: Update client"); sendMessage(msg); } } else { ! logger.debug("UDP Packet discarded - client(" + inetSocketAddress ! + ") is banned."); } ! } /** ! * This method add a message to be delivered to the client the message is ! * pointed to. ! * ! * @param msg ! * the message to be delivered. */ public void sendMessage(Message msg) { Log4J.startMethod(logger, "addMessage"); ! Socket socket = tcpSockets.get(msg.getAddress()); if (socket != null) { tcpWriter.write(msg, socket); --- 203,227 ---- } catch (InvalidVersionException e) { stats.add("Message invalid version", 1); ! MessageS2CInvalidMessage msg = new MessageS2CInvalidMessage(inetSocketAddress, "Invalid client version: Update client"); sendMessage(msg); } } else { ! logger.debug("UDP Packet discarded - client(" + inetSocketAddress + ") is banned."); } ! } /** ! * This method add a message to be delivered to the client the message ! * is pointed to. ! * ! * @param msg the message to be delivered. */ public void sendMessage(Message msg) { Log4J.startMethod(logger, "addMessage"); ! Socket socket = null; ! synchronized (tcpSockets) { ! socket = tcpSockets.get(msg.getAddress()); ! } if (socket != null) { tcpWriter.write(msg, socket); *************** *** 244,248 **** return keepRunning; } ! public void finishedReadThread() { isfinished = true; --- 235,239 ---- return keepRunning; } ! public void finishedReadThread() { isfinished = true; *************** *** 254,265 **** while (keepRunning) { Socket socket = tcpSocket.accept(); ! socket.setSoTimeout(500); ! InetSocketAddress inetSocketAddress = new InetSocketAddress( ! socket.getInetAddress(), socket.getPort()); ! tcpSockets.put(inetSocketAddress, socket); ! } tcpSocket.close(); ! } catch (IOException e) { logger.error(e, e); --- 245,257 ---- while (keepRunning) { Socket socket = tcpSocket.accept(); ! socket.setSoTimeout(500); ! InetSocketAddress inetSocketAddress = new InetSocketAddress(socket.getInetAddress(), socket.getPort()); ! synchronized (tcpSockets) { ! tcpSockets.put(inetSocketAddress, socket); ! } ! } tcpSocket.close(); ! } catch (IOException e) { logger.error(e, e); *************** *** 267,279 **** } public void disconnectClient(InetSocketAddress inetSocketAddress) { ! Socket socket = tcpSockets.get(inetSocketAddress); ! if (socket != null) { ! try { ! socket.close(); ! } catch (IOException e) { ! logger.warn(e, e); } - tcpSockets.remove(inetSocketAddress); } } --- 259,292 ---- } + /** + * Marks a client to be disconnected in the very near future. + * + * @param inetSocketAddress inetSocketAddress + */ public void disconnectClient(InetSocketAddress inetSocketAddress) { ! synchronized (toClose) { ! toClose.add(inetSocketAddress); ! } ! } ! ! public void internalDisconnectClientNow(InetSocketAddress inetSocketAddress) { ! synchronized (tcpSockets) { ! Socket socket = tcpSockets.get(inetSocketAddress); ! if (socket != null) { ! try { ! socket.close(); ! } catch (IOException e) { ! logger.warn(e, e); ! } ! tcpSockets.remove(inetSocketAddress); ! } ! } ! } ! ! public void internalDisconnectClientsNow() { ! synchronized (toClose) { ! for (InetSocketAddress inetSocketAddress : toClose) { ! internalDisconnectClientNow(inetSocketAddress); } } } |