From: Andrew C. O. <aco...@us...> - 2003-08-16 01:02:36
|
User: acoliver2 Date: 03/08/15 17:03:38 Modified: src/java/org/jboss/mail Base64.java Protocol.java Server.java ServerThread.java Added: src/java/org/jboss/mail ServerMBean.java ServerThreadPool.java Log: primarily threading changes. More to do on the unit test, for some reason I can't get the test to last until the threads are all done. looks like my earlier commit didn't work? Revision Changes Path 1.5 +0 -0 jboss-mail/src/java/org/jboss/mail/Base64.java Index: Base64.java =================================================================== RCS file: /cvsroot/jboss/jboss-mail/src/java/org/jboss/mail/Base64.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- Base64.java 11 Aug 2003 20:45:01 -0000 1.4 +++ Base64.java 16 Aug 2003 00:03:37 -0000 1.5 @@ -68,7 +68,7 @@ /** * Simple Base64 string decoding function * - * @version This is $Revision: 1.4 $ + * @version This is $Revision: 1.5 $ */ public class Base64 { 1.2 +12 -0 jboss-mail/src/java/org/jboss/mail/Protocol.java Index: Protocol.java =================================================================== RCS file: /cvsroot/jboss/jboss-mail/src/java/org/jboss/mail/Protocol.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- Protocol.java 24 Jun 2003 02:04:39 -0000 1.1 +++ Protocol.java 16 Aug 2003 00:03:37 -0000 1.2 @@ -8,6 +8,8 @@ import java.util.Map; +import org.w3c.dom.Element; + /** * The protocol is an abstraction for attaching bi-directional stateful protocols to the * "Server". The two most important artifacts of this contract are the properties and @@ -21,6 +23,11 @@ */ public interface Protocol { + /** + * @return get the key name for the protocol + */ + String getName(); + /** * greet the client upon connection * @param stream output stream of the client @@ -95,4 +102,9 @@ * @param properties */ void setProperties(Map properties); + + /** + * @return properties represented as XML Element root + */ + Element getProperties(); } 1.4 +82 -13 jboss-mail/src/java/org/jboss/mail/Server.java Index: Server.java =================================================================== RCS file: /cvsroot/jboss/jboss-mail/src/java/org/jboss/mail/Server.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- Server.java 11 Aug 2003 20:45:01 -0000 1.3 +++ Server.java 16 Aug 2003 00:03:37 -0000 1.4 @@ -15,6 +15,11 @@ import java.util.Map; import java.util.HashMap; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + + /** * Server constiutes a process or thread which listens on a given port, delegates the connection to * a thread using a pre-specified protocol. We can handle one of any number of pre-configured protocols @@ -29,6 +34,8 @@ extends ServiceMBeanSupport implements ServerMBean, Runnable { private ServerSocket serverSocket; + private ServerThreadPool pool; + private Thread poolthread; /** * what port we are listening on (usually 110) */ @@ -53,60 +60,91 @@ private Thread serverThread; /** - * @return + * @return CLASS of the protocol in use. */ public String getProtocol() { - return protocol.getClass().getName(); + return protocol.getName(); } /** - * @param protocol + * @param protocol name of the protocol for this service to use. */ public void setProtocol(String protocol) { this.protocol = protocolFactory.getProtocol(protocol); } /** - * @return + * @return number of milliseconds in between requests before we kill the connection */ public long getTimeout() { return timeout; } /** - * @param timeout + * @param timeout in milliseconds (max time allowed between requests) */ public void setTimeout(long timeout) { this.timeout = timeout; } /** - * @return + * @return address from java.net.ServerSocket ("localhost" is normal) */ public String getAddress() { return address; } /** - * @param address + * @param address from java.net.ServerSocket ("localhost" is sensible) */ public void setAddress(String address) { this.address = address; } /** - * @return + * @return number of milliseconds this connection can live before we kill it (regardless) */ public long getLife() { return life; } /** - * @param life + * @param life - number of milliseconds this connection can live before we kill it (regardless) */ public void setLife(long life) { this.life = life; } + + public void setProperties(Element properties) { + NodeList nodes = properties.getElementsByTagName("property"); + Map props = new HashMap(nodes.getLength()); + for (int k = 0; k < nodes.getLength(); k++) { + String pname = null; + String pval = null; + Node node = nodes.item(k); + NodeList subnodes = node.getChildNodes(); + for (int x =0;x < subnodes.getLength(); x++) { + Node subnode = subnodes.item(x); + if (subnode.getNodeName().equals("name")) { + pname = subnode.getNodeValue(); + } else if (subnode.getNodeName().equals("value")) { + pval = subnode.getNodeValue(); + } else { + throw new RuntimeException("properties must not have anything but name value pairs"); + } + + if (pname == null || pval == null) { + throw new RuntimeException("properties must have both name and value elements"); + } + props.put(pname,pval); + } + } + this.protocol.setProperties(props); + } + + public Element getProperties() { + return this.protocol.getProperties(); + } /** * Create an instance of server @@ -114,6 +152,8 @@ * @param backlog number of requests to backlog before refusing them (usually a max of 5 on most systems) * @param address of the local system (127.0.0.1 for testing purposes) * @param protocol key name for the ProtocolFactory (example "SMTP") + * @param timeout is the maximum amount of time between requests + * @param life is the maximum amount of time the connection can live before we kill it regardless */ public Server( int port, @@ -122,6 +162,9 @@ String protocol, long timeout, long life) { + this.pool = new ServerThreadPool(true, 200, 500); + poolthread = new Thread(pool); + poolthread.start(); this.port = port; this.backlog = backlog; this.address = address; @@ -135,6 +178,10 @@ } public Server() { + this.pool = new ServerThreadPool(true, 200, 500); + poolthread = new Thread(pool); + poolthread.start(); + // this(9000, 5, "localhost", "SMTP", 30000, 120000); } @@ -183,24 +230,43 @@ try { serverSocket.close(); serverThread.interrupt(); + poolthread.interrupt(); + } catch (Exception e) { } System.out.println("**** stopped Server Service instance"); } + /** + * TCPIP port to run on (presently we use 9000 for testing most of the time) + * @param port 25 is normal for SMTP + */ public void setPort(int port) { this.port = port; } + /** + * @return port the service is running on + */ public int getPort() { return this.port; } + /** + * set the socket backlog + * @param backlog - number of requests to queue at the low level. Operating system dependent on how + * much this matters. See ServerSocket for details, but 5 is a good number. Changing this after the + * service has started doesn't accomplish much (restart it) + */ public void setBacklog(int backlog) { this.backlog = backlog; } + /** + * @return backlog - number of requests to queue at the low level. Operating system dependent on how + * much this matters. See ServerSocket for details, but 5 is a good number. + */ public int getBacklog() { return backlog; } @@ -231,9 +297,11 @@ protocol.greet(output); boolean loop = true; input.mark(2); + Thread thread = pool.assign(socket, input, output, protocol, timeout,System.currentTimeMillis()+life); + thread.start(); //we want to return to where we are once we've made sure we're alive //TODO: Thead pooling instead of creation - ServerThread st = + /* ServerThread st = new ServerThread( socket, input, @@ -242,21 +310,22 @@ 30000, System.currentTimeMillis() + 120000); Thread thread = new Thread(st); - thread.start(); - + thread.start();*/ +/* MonitorThread mt = new MonitorThread( thread, System.currentTimeMillis() + 120000); Thread monitorThread = new Thread(mt); monitorThread.start(); - +*/ } catch (Exception e) { e.printStackTrace(); break; } } } + /** * temporary shit to create debug properties...will remove later. 1.2 +121 -1 jboss-mail/src/java/org/jboss/mail/ServerThread.java Index: ServerThread.java =================================================================== RCS file: /cvsroot/jboss/jboss-mail/src/java/org/jboss/mail/ServerThread.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ServerThread.java 8 Aug 2003 20:09:21 -0000 1.1 +++ ServerThread.java 16 Aug 2003 00:03:37 -0000 1.2 @@ -24,6 +24,120 @@ private Protocol protocol; private long timeout; private long life; + private Thread myThread; + private ServerThreadPool pool; + + /** + * @return + */ + public ServerThreadPool getPool() { + return pool; + } + + /** + * @param pool + */ + public void setPool(ServerThreadPool pool) { + this.pool = pool; + } + + /** + * @return + */ + public Thread getMyThread() { + return myThread; + } + + /** + * @param myThread + */ + public void setMyThread(Thread myThread) { + this.myThread = myThread; + } + + /** + * @return + */ + public InputStream getInput() { + return input; + } + + /** + * @param input + */ + public void setInput(InputStream input) { + this.input = input; + } + + /** + * @return + */ + public long getLife() { + return life; + } + + /** + * @param life + */ + public void setLife(long life) { + this.life = life; + } + + /** + * @return + */ + public OutputStream getOutput() { + return output; + } + + /** + * @param output + */ + public void setOutput(OutputStream output) { + this.output = output; + } + + /** + * @return + */ + public Protocol getProtocol() { + return protocol; + } + + /** + * @param protocol + */ + public void setProtocol(Protocol protocol) { + this.protocol = protocol; + } + + /** + * @return + */ + public Socket getSocket() { + return socket; + } + + /** + * @param socket + */ + public void setSocket(Socket socket) { + this.socket = socket; + } + + /** + * @return + */ + public long getTimeout() { + return timeout; + } + + /** + * @param timeout + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } public ServerThread( Socket socket, @@ -31,7 +145,8 @@ OutputStream output, Protocol protocol, long timeout, - long life) { + long life, + ServerThreadPool pool) { this.socket = socket; this.input = input; @@ -39,8 +154,12 @@ this.protocol = protocol; this.timeout = timeout; this.life = life; + this.pool = pool; } + + public ServerThread() { + } /* (non-Javadoc) * @see java.lang.Runnable#run() @@ -67,6 +186,7 @@ e.printStackTrace(); } finally { try { + pool.free(this); this.socket.close(); } catch (IOException ioe) { ioe.printStackTrace(); 1.1 jboss-mail/src/java/org/jboss/mail/ServerMBean.java Index: ServerMBean.java =================================================================== /* * Created on Aug 8, 2003 * * To change the template for this generated file go to * Window - Preferences - Java - Code Generation - Code and Comments */ package org.jboss.mail; import org.jboss.system.ServiceMBean; import org.w3c.dom.Element; /** * @author andrewoliver * * To change the template for this generated type comment go to * Window - Preferences - Java - Code Generation - Code and Comments */ public interface ServerMBean extends ServiceMBean { public void setProperties(Element properties); public Element getProperties(); void setPort(int port); int getPort(); void setAddress(String address); String getAddress(); void setLife(long life); long getLife(); void setTimeout(long timeout); long getTimeout(); void setBacklog(int backlog); int getBacklog(); void setProtocol(String protocol); String getProtocol(); } 1.1 jboss-mail/src/java/org/jboss/mail/ServerThreadPool.java Index: ServerThreadPool.java =================================================================== /* * Created on Aug 13, 2003 * * To change the template for this generated file go to * Window - Preferences - Java - Code Generation - Code and Comments */ package org.jboss.mail; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; /** * @author andrewoliver * * To change the template for this generated type comment go to * Window - Preferences - Java - Code Generation - Code and Comments */ public class ServerThreadPool implements Runnable { List availthreads; List usedthreads; private long interval; private boolean pool; private int numthreads; public ServerThreadPool(boolean pooled, int num, long interval) { if (pooled) { availthreads = Collections.synchronizedList(new ArrayList(num)); this.numthreads = num; createThreads(); } usedthreads = Collections.synchronizedList(new ArrayList(num)); this.interval = interval; this.pool = pooled; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { try { // availthreads.removeAll(availthreads); while (true) { Thread.sleep(interval); Iterator i = usedthreads.iterator(); while (i.hasNext()) { ServerThread thread = (ServerThread) i.next(); if (thread.getLife() > System.currentTimeMillis()) { cleanThread(i, thread); } } } } catch (InterruptedException e) { e.printStackTrace(); Iterator i = usedthreads.iterator(); while (i.hasNext()) { ServerThread st = (ServerThread) i.next(); // st.getMyThread().interrupt(); // i.remove(); cleanThread(i, st); } } } public void createThreads() { for (int k = 0; k < this.numthreads; k++) { ServerThread st = new ServerThread(); Thread thread = new Thread(st); st.setMyThread(thread); availthreads.add(st); } } public synchronized Thread assign( Socket socket, InputStream input, OutputStream output, Protocol protocol, long timeout, long life) throws InterruptedException { Thread thread = this.pool ? pooledThread(socket, input, output, protocol, timeout, life) : newThread(socket, input, output, protocol, timeout, life); return thread; } /** * @param socket * @param input * @param output * @param protocol * @param timeout * @param life * @return */ private Thread newThread( Socket socket, InputStream input, OutputStream output, Protocol protocol, long timeout, long life) { ServerThread serverthread = new ServerThread( socket, input, output, protocol, timeout, life, this); Thread thread = new Thread(serverthread); serverthread.setMyThread(thread); return thread; } /** * @param socket * @param input * @param output * @param protocol * @param timeout * @param life * @return */ private Thread pooledThread( Socket socket, InputStream input, OutputStream output, Protocol protocol, long timeout, long life) throws InterruptedException { awaitFreeThread(); ServerThread serverthread = (ServerThread) availthreads.remove(availthreads.size() - 1); this.usedthreads.add(serverthread); serverthread.setSocket(socket); serverthread.setInput(input); serverthread.setOutput(output); serverthread.setProtocol(protocol); serverthread.setLife(life); serverthread.setTimeout(timeout); serverthread.setPool(this); return serverthread.getMyThread(); } /** * */ private void awaitFreeThread() throws InterruptedException { while (availthreads.size() < 1) { Thread.sleep(500); } } /** * @param i * @param thread */ private void cleanThread(Iterator i, ServerThread thread) { i.remove(); Thread javathread = thread.getMyThread(); javathread.interrupt(); availthreads.add(thread); if (this.pool == true) { availthreads.add(thread); } } /** * @param thread */ public void free(ServerThread thread) { System.out.println("freeing " + thread); usedthreads.remove(thread); if (pool) { availthreads.add(thread); } } } |