|
From: <me...@us...> - 2002-10-04 06:10:49
|
Update of /cvsroot/cayenne/cayenne/src/cayenne/java/org/objectstyle/cayenne/conn
In directory usw-pr-cvs1:/tmp/cvs-serv14437/src/cayenne/java/org/objectstyle/cayenne/conn
Modified Files:
PoolManager.java
Log Message:
fixed connection pool multithreading problems -
implemented request queue with timeouts
Index: PoolManager.java
===================================================================
RCS file: /cvsroot/cayenne/cayenne/src/cayenne/java/org/objectstyle/cayenne/conn/PoolManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- PoolManager.java 4 Oct 2002 04:37:58 -0000 1.3
+++ PoolManager.java 4 Oct 2002 06:10:45 -0000 1.4
@@ -71,10 +71,13 @@
import javax.sql.PooledConnection;
import org.apache.log4j.Logger;
+import org.objectstyle.cayenne.util.RequestDequeue;
+import org.objectstyle.cayenne.util.RequestQueue;
/**
* PoolManager is a pooling DataSource impementation.
- * It wraps a non-pooling datasource.
+ * Internally to obtain connections PoolManager uses either a JDBC driver
+ * or another pooling datasource.
*
* <p>TODO: create a low priority thread that will do pool maintenance.</p>
*
@@ -83,6 +86,21 @@
public class PoolManager implements DataSource, ConnectionEventListener {
static Logger logObj = Logger.getLogger(PoolManager.class.getName());
+ /**
+ * Defines a maximum number of threads that could wait in the
+ * connection queue at any given moment. In the future this
+ * parameter should be made configurable.
+ */
+ public static final int MAX_QUEUE_SIZE = 25;
+
+ /**
+ * Defines a maximum time in milliseconds that a connection
+ * request could wait in the connection queue. After this period
+ * expires, an exception will be thrown in the calling method.
+ * In the future this parameter should be made configurable.
+ */
+ public static final int MAX_QUEUE_WAIT = 30000;
+
protected ConnectionPoolDataSource poolDataSource;
protected int minConnections;
protected int maxConnections;
@@ -93,9 +111,12 @@
protected List unusedPool;
protected List usedPool;
+ protected RequestQueue threadQueue;
- /** Creates new PoolManager using org.objectstyle.cayenne.conn.PoolDataSource
- * for an underlying ConnectionPoolDataSource. */
+ /**
+ * Creates new PoolManager using org.objectstyle.cayenne.conn.PoolDataSource
+ * for an underlying ConnectionPoolDataSource.
+ */
public PoolManager(
String jdbcDriver,
String dataSourceUrl,
@@ -163,6 +184,7 @@
this.poolDataSource = poolDataSource;
// init pool
+ threadQueue = new RequestQueue(MAX_QUEUE_SIZE, MAX_QUEUE_WAIT);
usedPool = Collections.synchronizedList(new ArrayList(maxConnections));
unusedPool = Collections.synchronizedList(new ArrayList(maxConnections));
growPool(minConnections, userName, password);
@@ -202,32 +224,27 @@
}
}
- /** Increase connection pool by the specified number of connections..
- * Throw SQLException if no more connections are allowed, or if
- * an error happens when creating a new connection.
+ /**
+ * Increases connection pool by the specified number of connections.
+ *
+ * @return the actual number of created connections.
+ * @throws SQLException if an error happens when creating a new connection.
*/
- protected synchronized void growPool(
+ protected synchronized int growPool(
int addConnections,
String userName,
String password)
throws SQLException {
- if (unusedPool.size() + usedPool.size() + addConnections > maxConnections) {
- StringBuffer msg = new StringBuffer();
- msg
- .append("An attempt to open more connections ")
- .append("than pool is allowed to handle.")
- .append("\n\tCurrent size: " + (unusedPool.size() + usedPool.size()))
- .append("\n\tTrying to open: " + addConnections)
- .append("\n\tMax allowed: " + maxConnections);
- throw new SQLException(msg.toString());
- }
-
- for (int i = 0; i < addConnections; i++) {
+ int i = 0;
+ int startPoolSize = getPoolSize();
+ for (; i < addConnections && startPoolSize + i < maxConnections; i++) {
PooledConnection newConnection = newPooledConnection(userName, password);
newConnection.addConnectionEventListener(this);
unusedPool.add(newConnection);
}
+
+ return i;
}
protected synchronized void shrinkPool(int closeConnections) throws SQLException {
@@ -286,11 +303,18 @@
return userName;
}
+ /**
+ * Returns current number of connections.
+ */
+ public synchronized int getPoolSize() {
+ return usedPool.size() + unusedPool.size();
+ }
+
/**
* Returns the number of connections obtained via this DataSource
* that are currently in use by the DataSource clients.
*/
- public int getCurrentlyInUse() {
+ public synchronized int getCurrentlyInUse() {
return usedPool.size();
}
@@ -299,7 +323,7 @@
* pool that are currently not used by any clients and are
* available immediately via <code>getConnection</code> method.
*/
- public int getCurrentlyUnused() {
+ public synchronized int getCurrentlyUnused() {
return unusedPool.size();
}
@@ -309,7 +333,7 @@
*
* <p><code>ds.getConnection(ds.getUserName(), ds.getPassword())</code></p>
*/
- public synchronized Connection getConnection() throws SQLException {
+ public Connection getConnection() throws SQLException {
return getConnection(userName, password);
}
@@ -317,18 +341,24 @@
public Connection getConnection(String userName, String password)
throws SQLException {
- // security check
- int totalCon = usedPool.size() + unusedPool.size();
- if (totalCon > maxConnections) {
- shrinkPool(totalCon - maxConnections);
- }
-
// increase pool if needed
// if further increase is not possible
// (say we exceed the maximum number of connections)
// this will throw an SQL exception...
if (unusedPool.size() == 0) {
- growPool(1, userName, password);
+ int size = growPool(1, userName, password);
+
+ // can't grow anymore, put on hold
+ if (size == 0) {
+ RequestDequeue result = threadQueue.queueThread();
+ if (result.isDequeueSuccess()) {
+ return ((PooledConnection) result.getDequeueEventObject()).getConnection();
+ } else if (result.isQueueFull()) {
+ throw new SQLException("Can't obtain connection. Too many requests waiting in the queue.");
+ } else {
+ throw new SQLException("Can't obtain connection. Request timed out.");
+ }
+ }
}
int lastObjectInd = unusedPool.size() - 1;
@@ -353,7 +383,9 @@
poolDataSource.setLogWriter(out);
}
- /** Returns closed connection to the pool. */
+ /**
+ * Returns closed connection to the pool.
+ */
public synchronized void connectionClosed(ConnectionEvent event) {
// return connection to the pool
PooledConnection closedConn = (PooledConnection) event.getSource();
@@ -362,8 +394,13 @@
// managed by this pool...
int usedInd = usedPool.indexOf(closedConn);
if (usedInd >= 0) {
- usedPool.remove(usedInd);
- unusedPool.add(closedConn);
+
+ // check connection request queue and assign connection to the
+ // first requestor in line
+ if (!threadQueue.dequeueFirst(closedConn)) {
+ usedPool.remove(usedInd);
+ unusedPool.add(closedConn);
+ }
}
// else ....
// other possibility is that this is a bad connection, so just ignore its closing event,
@@ -387,8 +424,7 @@
int usedInd = usedPool.indexOf(errorSrc);
if (usedInd >= 0) {
usedPool.remove(usedInd);
- }
- else {
+ } else {
int unusedInd = unusedPool.indexOf(errorSrc);
if (unusedInd >= 0)
unusedPool.remove(unusedInd);
|