From: <mrj...@us...> - 2008-08-15 14:51:38
|
Revision: 2942 http://tora.svn.sourceforge.net/tora/?rev=2942&view=rev Author: mrjohnson0 Date: 2008-08-15 14:51:44 +0000 (Fri, 15 Aug 2008) Log Message: ----------- qsqlsub's lockDown and lockUp are too primitive and exception handling was not always correct. patch moves LockingPtr to tothread.h and reworks the qsqldatabase locking. this fixes some crashes mostly around commit and rollback (unprotected access). qsqldatabase is not thread safe. Modified Paths: -------------- trunk/tora/src/toconnectionpool.cpp trunk/tora/src/toconnectionpool.h trunk/tora/src/toeventquerytask.cpp trunk/tora/src/toqsqlconnection.cpp trunk/tora/src/tothread.h Modified: trunk/tora/src/toconnectionpool.cpp =================================================================== --- trunk/tora/src/toconnectionpool.cpp 2008-08-15 14:07:49 UTC (rev 2941) +++ trunk/tora/src/toconnectionpool.cpp 2008-08-15 14:51:44 UTC (rev 2942) @@ -40,6 +40,7 @@ #include "toconf.h" #include "toconnectionpool.h" #include "tosql.h" +#include "tothread.h" #include <QTimer> #include <QCoreApplication> @@ -135,7 +136,7 @@ toConnectionPool::toConnectionPool(toConnection *conn) : QObject(conn) { Connection = conn; - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); for(int i = 0; i < PreferredSize; i++) { PooledSub *psub = new PooledSub; @@ -161,7 +162,7 @@ toConnection *conn = Connection; Connection = 0; - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); for(int mem = 0; mem < ptr->size(); mem++) { PooledSub *psub = (*ptr)[mem]; @@ -185,7 +186,7 @@ if(!Connection) return; - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); PooledSub *psub = (*ptr)[member]; psub->State = Broken; ptr.unlock(); @@ -211,7 +212,7 @@ if(!Connection) return Broken; - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); PooledSub *psub = (*ptr)[member]; if(psub->State != Free) return psub->State; @@ -249,13 +250,13 @@ int toConnectionPool::size() { - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); return ptr->size(); } toConnectionSub* toConnectionPool::steal(int member) { - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); return (*ptr)[member]->Sub; } @@ -264,7 +265,7 @@ { // keep lock here so adding connection below can be sure // there's no current lock in case of exception - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); for(int mem = 0; mem < ptr->size(); mem++) { PooledSub *psub = (*ptr)[mem]; @@ -290,14 +291,14 @@ toConnectionSub *sub = Connection->addConnection(); PooledSub *psub = new PooledSub(sub, Busy); - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); ptr->append(psub); return psub->Sub; } void toConnectionPool::release(toConnectionSub *sub) { - LockingPtr ptr(Pool, PoolLock); + LockingPtr<SubList> ptr(Pool, PoolLock); for(int mem = 0; mem < ptr->size(); mem++) { PooledSub *psub = (*ptr)[mem]; Modified: trunk/tora/src/toconnectionpool.h =================================================================== --- trunk/tora/src/toconnectionpool.h 2008-08-15 14:07:49 UTC (rev 2941) +++ trunk/tora/src/toconnectionpool.h 2008-08-15 14:51:44 UTC (rev 2942) @@ -160,52 +160,6 @@ // lock for the pool. QMutex PoolLock; - // much thanks to: - // http://www.ddj.com/cpp/184403766 - // i remove the template and added lock and unlock - class LockingPtr { - public: - // Constructors/destructors - LockingPtr(volatile SubList& obj, QMutex& mtx) - : pObj_(const_cast<SubList*>(&obj)), - pMtx_(&mtx) { - - mtx.lock(); - locked = true; - } - - ~LockingPtr() { - if(locked) - pMtx_->unlock(); - } - - void lock() { - pMtx_->lock(); - locked = true; - } - - void unlock() { - locked = false; - pMtx_->unlock(); - } - - // Pointer behavior - SubList& operator*() { - return *pObj_; - } - - SubList* operator->() { - return pObj_; - } - - private: - bool locked; - SubList* pObj_; - QMutex* pMtx_; - LockingPtr(const LockingPtr&); - LockingPtr& operator=(const LockingPtr&); - }; - // toConnection instance this class is a member of. will be used // to create new connections when needed. QPointer<toConnection> Connection; Modified: trunk/tora/src/toeventquerytask.cpp =================================================================== --- trunk/tora/src/toeventquerytask.cpp 2008-08-15 14:07:49 UTC (rev 2941) +++ trunk/tora/src/toeventquerytask.cpp 2008-08-15 14:51:44 UTC (rev 2942) @@ -124,6 +124,7 @@ } try { + Closed = true; delete Query; Query = 0; } Modified: trunk/tora/src/toqsqlconnection.cpp =================================================================== --- trunk/tora/src/toqsqlconnection.cpp 2008-08-15 14:07:49 UTC (rev 2941) +++ trunk/tora/src/toqsqlconnection.cpp 2008-08-15 14:51:44 UTC (rev 2942) @@ -43,6 +43,7 @@ #include "tomysqlkeywords.h" #include "tosql.h" #include "totool.h" +#include "tothread.h" #include <ctype.h> #include <string.h> @@ -63,7 +64,7 @@ #include <QString> #ifdef HAVE_POSTGRESQL_LIBPQ_FE_H - #include <libpq-fe.h> + #include <postgresql/libpq-fe.h> #endif // #include <QDebug> @@ -1117,46 +1118,48 @@ } }; -class qSqlSub : public toConnectionSub + class qSqlSub : public toConnectionSub { - toSemaphore Lock; public: - QSqlDatabase Connection; + // use compilter to prevent accidental unprotected access. Use + // LockingPtr. + volatile QSqlDatabase Connection; + QMutex Lock; QString Name; QString ConnectionID; qSqlSub(QSqlDatabase conn, const QString &name) - : Lock(1), Connection(conn), Name(name) { + : Connection(conn), Name(name) { } void lockUp() { - Lock.up(); } void lockDown () { - Lock.down(); } int getLockValue() { - return Lock.getValue(); + return 1; } - void reconnect(toConnection &conn); - ~qSqlSub() { - Connection.close(); + LockingPtr<QSqlDatabase> ptr(Connection, Lock); + ptr->close(); } + + // doh. better release the lock before calling this void throwError(const QString &sql) { - throw ErrorString(Connection.lastError(), sql); + LockingPtr<QSqlDatabase> ptr(Connection, Lock); + throw ErrorString(ptr->lastError(), sql); } }; -class qSqlQuery : public toQuery::queryImpl + class qSqlQuery : public toQuery::queryImpl { QSqlQuery *Query; QSqlRecord Record; @@ -1192,8 +1195,10 @@ } QSqlQuery *createQuery(const QString &query) { + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); + QSqlQuery *ret; - ret = new QSqlQuery(Connection->Connection); + ret = new QSqlQuery(*ptr); if (toQSqlProvider::OnlyForward) ret->setForwardOnly(true); ret->exec(query); @@ -1228,8 +1233,10 @@ pars.insert(pars.end(), Connection->ConnectionID); conn.execute(sql, pars); } - else - native_cancel(Connection->Connection.driver()); + else { + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); + native_cancel(ptr->driver()); + } } catch (...) {} @@ -1302,7 +1309,7 @@ if (EOQ) throw QString::fromLatin1("Tried to read past end of query"); - Connection->lockDown(); + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); QVariant val; bool fixEmpty = false; if (ColumnOrder) @@ -1358,19 +1365,11 @@ delete Query; Query = NULL; CurrentExtra = *ExtraData.begin(); - try - { - Query = createQuery(QueryParam(parseReorder(query()->sql()), query()->params(), &ExtraData)); - } - catch (...) - { - Connection->lockUp(); - throw; - } + Query = createQuery(QueryParam(parseReorder(query()->sql()), query()->params(), &ExtraData)); + + ptr.unlock(); checkQuery(); } - else - Connection->lockUp(); return toQValue::fromVariant(val); } @@ -1380,35 +1379,30 @@ } virtual int rowsProcessed(void) { + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); + if (!Query) return 0; - Connection->lockDown(); - int ret = Query->numRowsAffected(); - Connection->lockUp(); - return ret; + return Query->numRowsAffected(); } virtual int columns(void) { - Connection->lockDown(); + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); int ret = Record.count(); - ; if (ColumnOrder) - { ret = ColumnOrderSize; - } - Connection->lockUp(); + return ret; } virtual std::list<toQuery::queryDescribe> describe(void) { + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); std::list<toQuery::queryDescribe> ret; if (Query && Query->isSelect()) { QString provider = query()->connection().provider(); - Connection->lockDown(); QSqlRecord rec = Query->record(); ret = Describe(provider, rec, ColumnOrder, ColumnOrderSize); - Connection->lockUp(); } return ret; } @@ -1598,9 +1592,8 @@ qSqlSub *sub = dynamic_cast<qSqlSub *>(query.connectionSub()); if (sub) { - sub->lockDown(); - desc = Describe(connection().provider(), sub->Connection.record(quote(table.Name)), NULL, 0); - sub->lockUp(); + LockingPtr<QSqlDatabase> ptr(sub->Connection, sub->Lock); + desc = Describe(connection().provider(), ptr->record(quote(table.Name)), NULL, 0); } } else @@ -1633,14 +1626,20 @@ virtual void commit(toConnectionSub *sub) { qSqlSub *conn = qSqlConv(sub); - if (!conn->Connection.commit() && HasTransactions) + LockingPtr<QSqlDatabase> ptr(conn->Connection, conn->Lock); + if(!ptr->commit() && HasTransactions) { + ptr.unlock(); conn->throwError(QString::fromLatin1("COMMIT")); + } } virtual void rollback(toConnectionSub *sub) { qSqlSub *conn = qSqlConv(sub); - if (!conn->Connection.rollback() && HasTransactions) + LockingPtr<QSqlDatabase> ptr(conn->Connection, conn->Lock); + if(!ptr->rollback() && HasTransactions) { + ptr.unlock(); conn->throwError(QString::fromLatin1("ROLLBACK")); + } } virtual toConnectionSub *createConnection(void); @@ -1654,11 +1653,12 @@ { QString ret; qSqlSub *conn = qSqlConv(sub); - conn->lockDown(); try { - QSqlQuery query = conn->Connection.exec( - toSQL::string(SQLVersion, connection())); + LockingPtr<QSqlDatabase> ptr(conn->Connection, conn->Lock); + + QSqlQuery query = ptr->exec(toSQL::string(SQLVersion, + connection())); if (query.next()) { if (query.isValid()) @@ -1671,7 +1671,6 @@ } catch (...) {} - conn->lockUp(); return ret; } @@ -1682,23 +1681,21 @@ virtual void execute(toConnectionSub *sub, const QString &sql, toQList ¶ms) { qSqlSub *conn = qSqlConv(sub); - conn->lockDown(); try { - QSqlQuery Query(conn->Connection.exec(QueryParam(sql, params, NULL))); + LockingPtr<QSqlDatabase> ptr(conn->Connection, conn->Lock); + QSqlQuery Query(ptr->exec(QueryParam(sql, params, NULL))); if (!Query.isActive()) { - conn->lockUp(); QString msg = QString::fromLatin1("Query not active "); msg += sql; + ptr.unlock(); throw ErrorString(Query.lastError(), msg); } - conn->lockUp(); } catch (const toQSqlProviderAggregate &) { // Ok, this one is complicated and will probably never be used. - conn->lockUp(); throw QString("Direct exec aggregate queries are not supported, use a toQuery object for this one"); } } @@ -1784,12 +1781,6 @@ void toQSqlProvider::qSqlQuery::execute(void) { - while (Connection->getLockValue() > 1) - { - Connection->lockDown(); - toStatusMessage(QString::fromLatin1("Too high value on connection lock semaphore")); - } - Connection->lockDown(); Query = NULL; try { @@ -1800,42 +1791,33 @@ ExtraData = extraData(aggr); if (ExtraData.begin() != ExtraData.end()) CurrentExtra = *ExtraData.begin(); - try + + QString t = QueryParam(parseReorder(query()->sql()), query()->params(), &ExtraData); + if (t.isEmpty()) { - QString t = QueryParam(parseReorder(query()->sql()), query()->params(), &ExtraData); - if (t.isEmpty()) - { - toStatusMessage("Nothing to send to aggregate query"); - Query = NULL; - EOQ = true; - Connection->lockUp(); - return ; - } - else - Query = createQuery(t); + toStatusMessage("Nothing to send to aggregate query"); + Query = NULL; + EOQ = true; + return; } - catch (...) - { - Connection->lockUp(); - throw; - } + else + Query = createQuery(t); } + checkQuery(); } -void toQSqlProvider::qSqlQuery::checkQuery(void) // Must call with lockDown!!!! +void toQSqlProvider::qSqlQuery::checkQuery(void) // Must *not* call while locked { - while (Connection->getLockValue() > 0) - { - toStatusMessage(QString::fromLatin1("Too high value on connection lock semaphore for checkQuery")); - } + LockingPtr<QSqlDatabase> ptr(Connection->Connection, Connection->Lock); + do { if (!Query->isActive()) { - Connection->lockUp(); QString msg = QString::fromLatin1("Query not active "); msg += query()->sql(); + ptr.unlock(); throw ErrorString(Query->lastError(), msg); } @@ -1866,20 +1848,13 @@ { delete Query; Query = NULL; - try - { - Query = createQuery(QueryParam(parseReorder(query()->sql()), query()->params(), &ExtraData)); - } - catch (...) - { - Connection->lockUp(); - throw; - } + + Query = createQuery(QueryParam(parseReorder(query()->sql()), + query()->params(), + &ExtraData)); } } while (ExtraData.begin() != ExtraData.end() && EOQ); - - Connection->lockUp(); } toQSqlProvider::qSqlSub *toQSqlProvider::createConnection(toConnection &conn) @@ -1940,19 +1915,6 @@ return ret; } -void toQSqlProvider::qSqlSub::reconnect(toConnection &conn) -{ - qSqlSub *sub = createConnection(conn); - Connection = sub->Connection; - ConnectionID = sub->ConnectionID; - - // Switch database and remove the old one - QString t = Name; - Name = sub->Name; - sub->Name = t; - delete sub; -} - toConnectionSub *toQSqlProvider::qSqlConnection::createConnection(void) { return toQSqlProvider::createConnection(connection()); Modified: trunk/tora/src/tothread.h =================================================================== --- trunk/tora/src/tothread.h 2008-08-15 14:07:49 UTC (rev 2941) +++ trunk/tora/src/tothread.h 2008-08-15 14:51:44 UTC (rev 2942) @@ -185,4 +185,49 @@ } }; + +// much thanks to: +// http://www.ddj.com/cpp/184403766 +// i added lock and unlock +template <typename T> class LockingPtr { +public: + // Constructors/destructors + LockingPtr(volatile T& obj, QMutex& mtx) : pObj_(const_cast<T*>(&obj)), + pMtx_(&mtx) { + mtx.lock(); + locked = true; + } + + ~LockingPtr() { + if(locked) + pMtx_->unlock(); + } + + void lock() { + pMtx_->lock(); + locked = true; + } + + void unlock() { + locked = false; + pMtx_->unlock(); + } + + // Pointer behavior + T& operator*() { + return *pObj_; + } + + T* operator->() { + return pObj_; + } + +private: + bool locked; + T* pObj_; + QMutex* pMtx_; + LockingPtr(const LockingPtr&); + LockingPtr& operator=(const LockingPtr&); +}; + #endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |