From: <ri...@us...> - 2006-08-13 04:49:08
|
Revision: 1451 Author: rilson Date: 2006-08-12 21:46:59 -0700 (Sat, 12 Aug 2006) ViewCVS: http://svn.sourceforge.net/osdldbt/?rev=1451&view=rev Log Message: ----------- Added 'final' version of the DMSUT interface class. Added flag/code to set the duration of the test in DriverCustomerMain. Modified Paths: -------------- trunk/dbt5/scripts/mix_analyzer.pl trunk/dbt5/src/BrokerageHouse/BrokerageHouse.cpp trunk/dbt5/src/DriverCustomer/DriverCustomer.cpp trunk/dbt5/src/DriverCustomer/DriverCustomerMain.cpp trunk/dbt5/src/Makefile.am trunk/dbt5/src/TestTransactions/TestTxn.cpp trunk/dbt5/src/include/BrokerageHouse.h trunk/dbt5/src/include/CESUT.h trunk/dbt5/src/include/CommonStructs.h trunk/dbt5/src/include/DMSUT.h trunk/dbt5/src/include/DriverCustomer.h trunk/dbt5/src/include/dbt5error.h trunk/dbt5/src/include/transactions.h trunk/dbt5/src/interfaces/CESUT.cpp trunk/dbt5/src/interfaces/DMSUTtest.cpp trunk/dbt5/src/interfaces/MEESUTtest.cpp trunk/dbt5/src/interfaces/TxnHarnessSendToMarketTest.cpp Added Paths: ----------- trunk/dbt5/src/include/DMSUTtest.h trunk/dbt5/src/interfaces/DMSUT.cpp Modified: trunk/dbt5/scripts/mix_analyzer.pl =================================================================== --- trunk/dbt5/scripts/mix_analyzer.pl 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/scripts/mix_analyzer.pl 2006-08-13 04:46:59 UTC (rev 1451) @@ -60,7 +60,7 @@ $transaction{ '7' } = "Security Detail "; $transaction{ '8' } = "Market Feed "; $transaction{ '9' } = "Market Watch "; -$transaction{ 'D' } = "Data Maintenance "; +$transaction{ '10' } = "Data Maintenance "; my @xtran = ( "to_tran", "tr_tran", "tl_tran", "tu_tran", "ts_tran", @@ -201,7 +201,7 @@ $current_transaction_count{ '7' } = 0; $current_transaction_count{ '8' } = 0; $current_transaction_count{ '9' } = 0; -$current_transaction_count{ 'D' } = 0; +$current_transaction_count{ '10' } = 0; $rollback_count{ '0' } = 0; $rollback_count{ '1' } = 0; @@ -213,7 +213,7 @@ $rollback_count{ '7' } = 0; $rollback_count{ '8' } = 0; $rollback_count{ '9' } = 0; -$rollback_count{ 'D' } = 0; +$rollback_count{ '10' } = 0; # # Transaction counts for the steady state portion of the test. @@ -228,7 +228,7 @@ $transaction_count{ '7' } = 0; $transaction_count{ '8' } = 0; $transaction_count{ '9' } = 0; -$transaction_count{ 'D' } = 0; +$transaction_count{ '10' } = 0; # # Read the data directly from the log file and handle it on the fly. @@ -262,7 +262,7 @@ . "$current_transaction_count{ '7' } " . "$current_transaction_count{ '8' } " . "$current_transaction_count{ '9' } " - . "$current_transaction_count{ 'D' }\n"; + . "$current_transaction_count{ '10' }\n"; ++$elapsed_time; $previous_time = $current_time; @@ -280,7 +280,7 @@ $current_transaction_count{ '7' } = 0; $current_transaction_count{ '8' } = 0; $current_transaction_count{ '9' } = 0; - $current_transaction_count{ 'D' } = 0; + $current_transaction_count{ '10' } = 0; } # @@ -383,12 +383,12 @@ } ++$mw_distribution{ $time }; print MW_FILE "$x_time $response_time\n"; - } elsif ( $word[ 1 ] eq 'D' ) { + } elsif ( $word[ 1 ] eq '10' ) { unless ($steady_state_start_time == 0) { - ++$transaction_count{ 'D' }; - $transaction_response_time{ 'D' } += $response_time; + ++$transaction_count{ '10' }; + $transaction_response_time{ '10' } += $response_time; push @data_maintenance_response_time, $response_time; - ++$current_transaction_count{ 'D' }; + ++$current_transaction_count{ '10' }; } ++$dm_distribution{ $time }; print DM_FILE "$x_time $response_time\n"; @@ -412,8 +412,8 @@ ++$rollback_count{ '8' } unless ($steady_state_start_time == 0); } elsif ( $word[ 1 ] eq '9R' ) { ++$rollback_count{ '9' } unless ($steady_state_start_time == 0); - } elsif ( $word[ 1 ] eq 'DR' ) { - ++$rollback_count{ 'D' } unless ($steady_state_start_time == 0); + } elsif ( $word[ 1 ] eq '10R' ) { + ++$rollback_count{ '10' } unless ($steady_state_start_time == 0); } elsif ( $word[ 1 ] eq 'E' ) { ++$errors; ++$error_count{ $word[ 3 ] }; @@ -745,7 +745,7 @@ my $security_detail90index = $transaction_count{'7'} * 0.90; my $market_feed90index = $transaction_count{'8'} * 0.90; my $market_watch90index = $transaction_count{'9'} * 0.90; -my $data_maintenance90index = $transaction_count{'D'} * 0.90; +my $data_maintenance90index = $transaction_count{'10'} * 0.90; my %response90th; @@ -845,9 +845,9 @@ $floor = floor($data_maintenance90index); $ceil = ceil($data_maintenance90index); if ($floor == $ceil) { - $response90th{'D'} = $data_maintenance_response_time[$data_maintenance90index]; + $response90th{'10'} = $data_maintenance_response_time[$data_maintenance90index]; } else { - $response90th{'D'} = ($data_maintenance_response_time[$floor] + + $response90th{'10'} = ($data_maintenance_response_time[$floor] + $data_maintenance_response_time[$ceil]) / 2; } @@ -857,7 +857,7 @@ printf(" Response Time (s)\n"); printf(" Transaction %% Average : 90th %% Total Rollbacks %%\n"); printf("----------------- ----- --------------------- ----------- --------------- -----\n"); -foreach my $idx ('0', '1', '2', '3', '4','5','6','7','8','9','D') { +foreach my $idx ('0', '1', '2', '3', '4','5','6','7','8','9','10') { if ($transaction_count{$idx} == 0) { printf("%12s 0.00 N/A 0 0 0.00\n", $transaction{$idx}); } else { Modified: trunk/dbt5/src/BrokerageHouse/BrokerageHouse.cpp =================================================================== --- trunk/dbt5/src/BrokerageHouse/BrokerageHouse.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/BrokerageHouse/BrokerageHouse.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -161,6 +161,22 @@ delete pTradeUpdate; break; } + case DATA_MAINTENANCE: + { + CDataMaintenance* pDataMaintenance = new CDataMaintenance(pDBConnection); + iRet = pThrParam->pBrokerageHouse->RunDataMaintenance( + &(pMessage->TxnInput.DataMaintenanceTxnInput), *pDataMaintenance ); + delete pDataMaintenance; + break; + } + case TRADE_CLEANUP: + { + CTradeCleanup* pTradeCleanup = new CTradeCleanup(pDBConnection); + iRet = pThrParam->pBrokerageHouse->RunTradeCleanup( + &(pMessage->TxnInput.TradeCleanupTxnInput), *pTradeCleanup ); + delete pTradeCleanup; + break; + } default: { cout<<"wrong txn type"<<endl; @@ -302,7 +318,15 @@ return( TxnOutput.status ); } +// Run Trade Cleanup transaction +INT32 CBrokerageHouse::RunTradeCleanup( PTradeCleanupTxnInput pTxnInput, CTradeCleanup &TradeCleanup ) +{ + TTradeCleanupTxnOutput TxnOutput; + TradeCleanup.DoTxn( pTxnInput, &TxnOutput ); + return( TxnOutput.status ); +} + // Run Market Feed transaction INT32 CBrokerageHouse::RunMarketFeed( PMarketFeedTxnInput pTxnInput, CMarketFeed &MarketFeed ) { Modified: trunk/dbt5/src/DriverCustomer/DriverCustomer.cpp =================================================================== --- trunk/dbt5/src/DriverCustomer/DriverCustomer.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/DriverCustomer/DriverCustomer.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -10,20 +10,23 @@ using namespace TPCE; +// global variables pthread_t* g_tid = NULL; +int stop_time = 0; void* TPCE::CustomerWorkerThread(void* data) { PCustomerThreadParam pThrParam = reinterpret_cast<PCustomerThreadParam>(data); + ostringstream osErr; struct timespec ts, rem; ts.tv_sec = (time_t) (pThrParam->pDriverCustomer->m_iPacingDelay / 1000); ts.tv_nsec = (long) (pThrParam->pDriverCustomer->m_iPacingDelay % 1000) * 1000000; - while (true) + do { - cout<<"thread = "<<pthread_self()<<"==> "; + //cout<<"thread = "<<pthread_self()<<"==> "; pThrParam->pDriverCustomer->m_pCCE->DoTxn(); // wait for pacing delay -- this delays happens after the mix logging @@ -31,14 +34,17 @@ if (errno == EINTR) { memcpy(&ts, &rem, sizeof(timespec)); } else { - ostringstream osErr; osErr<<"pacing delay time invalid "<<ts.tv_sec<<" s "<<ts.tv_nsec<<" ns"<<endl; pThrParam->pDriverCustomer->LogErrorMessage(osErr.str()); break; } } - } + } while (time(NULL) < stop_time); + osErr<<"User thread # "<<pthread_self()<<" terminated."<<endl; + pThrParam->pDriverCustomer->LogErrorMessage(osErr.str()); + + delete pThrParam; return NULL; } @@ -83,46 +89,79 @@ m_iPacingDelay(iPacingDelay) { m_pLog = new CEGenLogger(eDriverEGenLoader, 0, "DriverCustomer.log", &m_fmt); - + m_pDriverCETxnSettings = new TDriverCETxnSettings; m_InputFiles.Initialize(eDriverEGenLoader, iConfiguredCustomerCount, iActiveCustomerCount, szInDir); m_fLog.open("DriverCustomer_Error.log", ios::out); m_fMix.open(CE_MIX_LOG_NAME, ios::out); + // initialize CESUT interface - m_pCCESUT = new CCESUT(iBHlistenPort, &m_fLog, &m_fMix); + m_pCCESUT = new CCESUT(iBHlistenPort, &m_fLog, &m_fMix, &m_LogLock, &m_MixLock); - m_pDriverCETxnSettings = new TDriverCETxnSettings; - // initialize CE - Customer Emulator m_pCCE = new CCE( m_pCCESUT, m_pLog, m_InputFiles, iConfiguredCustomerCount, iActiveCustomerCount, iScaleFactor, iDaysOfInitialTrades, UniqueId, m_pDriverCETxnSettings ); + + // initialize DMSUT interface + m_pCDMSUT = new CDMSUT(iBHlistenPort, &m_fLog, &m_fMix, &m_LogLock, &m_MixLock); + + // initialize DM - Data Maintenance + m_pCDM = new CDM( m_pCDMSUT, m_pLog, m_InputFiles, iConfiguredCustomerCount, iActiveCustomerCount, + iScaleFactor, iDaysOfInitialTrades, UniqueId ); } // Destructor CDriverCustomer::~CDriverCustomer() { + delete m_pCDM; + delete m_pCDMSUT; delete m_pCCE; + delete m_pCCESUT; + + m_fMix.close(); + m_fLog.close(); + delete m_pDriverCETxnSettings; - delete m_pCCESUT; delete m_pLog; - m_fLog.close(); - m_fMix.close(); } // RunTest -void CDriverCustomer::RunTest(int iSleep) +void CDriverCustomer::RunTest(int iSleep, int iTestDuration) { + g_tid = (pthread_t*) malloc(sizeof(pthread_t) * m_iUsers); + + // time to sleep between thread creation struct timespec ts, rem; - ts.tv_sec = (time_t) (iSleep / 1000); ts.tv_nsec = (long) (iSleep % 1000) * 1000000; - PCustomerThreadParam pThrParam; + // Caulculate when the test should stop. + int threads_start_time = (int) ((double) iSleep / 1000.0 * (double) m_iUsers); + stop_time = time(NULL) + iTestDuration + threads_start_time; - g_tid = (pthread_t*) malloc(sizeof(pthread_t) * m_iUsers); + CDateTime dtAux; + dtAux.SetToCurrent(); + cout<<"Test is starting at "<<dtAux.ToStr(02)<<endl + <<"Duration of ramp-up: "<<threads_start_time<<" seconds"<<endl; + + dtAux.AddMinutes((iTestDuration + threads_start_time)/60); + cout<<"Test will stop at "<<dtAux.ToStr(02)<<endl; + + // before starting the test run Trade-Cleanup transaction + cout<<endl<<"Running Trade-Cleanup transaction before starting the test..."<<endl; + m_pCDM->DoCleanupTxn(); + cout<<"Trade-Cleanup transaction completed."<<endl<<endl; + LogErrorMessage(">> Start of ramp-up.\n"); - for (int i = 1; i <= m_iUsers; i++) + + //start thread that run the Data Maintenance transaction + EntryDMWorkerThread(this); + + // parameter for the new thread + PCustomerThreadParam pThrParam; + + for (int i = 1; i <= m_iUsers; i++) // i=0 is Data-Maintenance { pThrParam = new TCustomerThreadParam; memset(pThrParam, 0, sizeof(TCustomerThreadParam)); // zero the structure @@ -143,14 +182,15 @@ } } + // mark end of ramp-up m_MixLock.ClaimLock(); m_fMix<<(int)time(NULL)<<",START"<<endl; m_MixLock.ReleaseLock(); - LogErrorMessage(">> End of ramp-up.\n"); + LogErrorMessage(">> End of ramp-up.\n\n"); // wait until all threads quit - for (int i = 1; i <= m_iUsers; i++) + for (int i = 0; i <= m_iUsers; i++) // 0 represents the Data-Maintenance thread { if (pthread_join(g_tid[i], NULL) != 0) { @@ -159,6 +199,57 @@ } } + +// DM worker thread +void* TPCE::DMWorkerThread(void* data) +{ + PCustomerThreadParam pThrParam = reinterpret_cast<PCustomerThreadParam>(data); + + do + { + pThrParam->pDriverCustomer->m_pCDM->DoTxn(); + sleep(60); // Data-Maintenance runs once a minute + } while (time(NULL) < stop_time); + + pThrParam->pDriverCustomer->LogErrorMessage("Data-Maintenance thread terminated.\n"); + delete pThrParam; + + return NULL; +} + +// entry point for worker thread +void TPCE::EntryDMWorkerThread(CDriverCustomer* ptr) +{ + PCustomerThreadParam pThrParam = new TCustomerThreadParam; + memset(pThrParam, 0, sizeof(TCustomerThreadParam)); // zero the structure + pThrParam->pDriverCustomer = ptr; + + pthread_attr_t threadAttribute; // thread attribute + + try + { + int status = pthread_attr_init(&threadAttribute); // initialize the attribute object + if (status != 0) + { + throw new CThreadErr( CThreadErr::ERR_THREAD_ATTR_INIT ); + } + + // create the thread in the joinable state + status = pthread_create(&g_tid[0], &threadAttribute, &DMWorkerThread, reinterpret_cast<void*>(pThrParam)); + + if (status != 0) + { + throw new CThreadErr( CThreadErr::ERR_THREAD_CREATE ); + } + pThrParam->pDriverCustomer->LogErrorMessage(">> Data-Maintenance thread started.\n"); + } + catch(CThreadErr *pErr) + { + pThrParam->pDriverCustomer->LogErrorMessage("Data-Maintenance Thread not created"); + delete pErr; + } +} + // LogErrorMessage void CDriverCustomer::LogErrorMessage( const string sErr ) { Modified: trunk/dbt5/src/DriverCustomer/DriverCustomerMain.cpp =================================================================== --- trunk/dbt5/src/DriverCustomer/DriverCustomerMain.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/DriverCustomer/DriverCustomerMain.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -19,6 +19,7 @@ TIdent iActiveCustomerCount = iDefaultLoadUnitSize; // total number of customers in the database int iScaleFactor = 500; // # of customers for 1 TRTPS int iDaysOfInitialTrades = 300; +int iTestDuration = 0; int iSleep = 1000; // msec between thread creation int iUsers = 0; // # users int iPacingDelay = 0; @@ -42,8 +43,9 @@ <<" -f number "<<iScaleFactor<<"\t\t # of customers for 1 NOTPS"<<endl <<" -d number "<<iDaysOfInitialTrades<<"\t\t # of Days of Initial Trades"<<endl <<" -l number "<<iLoadUnitSize<<"\t\t # of customers in one load unit"<<endl - <<" -s number "<<iSleep<<"\t\t # of msec between customer creation"<<endl - <<" -u number "<<iUsers<<"\t\t # of Users"<<endl + <<" -t number Duration of the test (seconds)"<<endl + <<" -s number "<<iSleep<<"\t\t # of msec between thread creation"<<endl + <<" -u number # of Users"<<endl <<" -p number "<<iPacingDelay<<"\t\t # of msec to wait after the current txn and"<<endl <<"\t\t\t\t before the next txn"<<endl <<" -g number Unique input for automatic seed generation"<<endl; @@ -117,6 +119,9 @@ case 'p': sscanf(vp, "%d", &iPacingDelay); break; + case 't': + sscanf(vp, "%d", &iTestDuration); + break; default: Usage(); cout<<"Error: Unrecognized option: "<<sp<<endl; @@ -177,17 +182,24 @@ // UniqueId must be assigned if (UniqueId == 0) { - cerr << "UniqueID number must be specified."<<endl; + cerr << "unique id number must be specified."<<endl; bRet = false; } // iUsers must be assigned if (iUsers == 0) { - cerr << "iUsers number must be specified."<<endl; + cerr << "number of users must be specified."<<endl; bRet = false; } + // iTestDuration must be assigned + if (iTestDuration == 0) + { + cerr << "the duration of the test must be specified."<<endl; + bRet = false; + } + return bRet; } @@ -217,7 +229,8 @@ cout<<"\tScale Factor:\t\t\t"<<iScaleFactor<<endl; cout<<"\t#Days of initial trades:\t"<<iDaysOfInitialTrades<<endl; cout<<"\tLoad unit size:\t\t\t"<<iLoadUnitSize<<endl; - cout<<"\tSleep between customers:\t"<<iSleep<<endl; + cout<<"\tSleep between Users:\t"<<iSleep<<endl; + cout<<"\tTest duration (sec):\t\t"<<iTestDuration<<endl; cout<<"\tUnique ID:\t\t\t"<<UniqueId<<endl; cout<<"\t# of Users:\t\t\t"<<iUsers<<endl; cout<<"\tPacing Delay (msec):\t\t"<<iPacingDelay<<endl<<endl; @@ -227,7 +240,7 @@ CDriverCustomer DriverCustomer(szInDir, iConfiguredCustomerCount, iActiveCustomerCount, iScaleFactor, iDaysOfInitialTrades, UniqueId, iBHlistenPort, iUsers, iPacingDelay); - DriverCustomer.RunTest(iSleep); + DriverCustomer.RunTest(iSleep, iTestDuration); } catch (CBaseErr *pErr) Modified: trunk/dbt5/src/Makefile.am =================================================================== --- trunk/dbt5/src/Makefile.am 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/Makefile.am 2006-08-13 04:46:59 UTC (rev 1451) @@ -1,5 +1,5 @@ # Makefile -# 2006 Rilson Nascimento +# 2006 Mark Wong & Rilson Nascimento AUTOMAKE_OPTIONS = no-dependencies @@ -18,6 +18,6 @@ BrokerageHouseMain_SOURCES = $(EGEN)/DateTime.cpp $(INTF)/CSocket.cpp $(INTF)/TxnHarnessSendToMarket.cpp BrokerageHouse/BrokerageHouseMain.cpp $(TXN)/DBConnection.cpp $(TXN)/TxnBaseDB.cpp $(TXN)/MarketFeedDB.cpp $(TXN)/TradeCleanupDB.cpp $(TXN)/DataMaintenanceDB.cpp $(TXN)/MarketWatchDB.cpp $(TXN)/SecurityDetailDB.cpp $(TXN)/BrokerVolumeDB.cpp $(TXN)/CustomerPositionDB.cpp $(TXN)/TradeUpdateDB.cpp $(TXN)/TradeLookupDB.cpp $(TXN)/TradeResultDB.cpp $(TXN)/TradeOrderDB.cpp $(TXN)/TradeStatusDB.cpp BrokerageHouse/BrokerageHouse.cpp -DriverCustomerMain_SOURCES = DriverCustomer/DriverCustomer.cpp DriverCustomer/DriverCustomerMain.cpp $(EGEN)/CETxnMixGenerator.cpp $(EGEN)/CETxnInputGenerator.cpp $(EGEN)/ReadRowFunctions.cpp $(EGEN)/InputFlatFilesStructure.cpp $(EGEN)/error.cpp $(EGEN)/AddressTable.cpp $(EGEN)/EGenVersion.cpp $(EGEN)/CustomerTable.cpp $(EGEN)/CustomerSelection.cpp $(EGEN)/Random.cpp $(EGEN)/Person.cpp $(EGEN)/DateTime.cpp $(EGEN)/CE.cpp $(INTF)/CESUT.cpp $(INTF)/CSocket.cpp +DriverCustomerMain_SOURCES = DriverCustomer/DriverCustomer.cpp DriverCustomer/DriverCustomerMain.cpp $(EGEN)/CETxnMixGenerator.cpp $(EGEN)/CETxnInputGenerator.cpp $(EGEN)/ReadRowFunctions.cpp $(EGEN)/InputFlatFilesStructure.cpp $(EGEN)/error.cpp $(EGEN)/AddressTable.cpp $(EGEN)/EGenVersion.cpp $(EGEN)/CustomerTable.cpp $(EGEN)/CustomerSelection.cpp $(EGEN)/Random.cpp $(EGEN)/Person.cpp $(EGEN)/DateTime.cpp $(EGEN)/CE.cpp $(INTF)/CESUT.cpp $(INTF)/CSocket.cpp $(INTF)/DMSUT.cpp $(EGEN)/DM.cpp DriverMarketMain_SOURCES = $(EGEN)/ReadRowFunctions.cpp $(EGEN)/EGenVersion.cpp $(EGEN)/error.cpp $(INTF)/CSocket.cpp $(EGEN)/Random.cpp $(EGEN)/DateTime.cpp DriverMarket/DriverMarket.cpp DriverMarket/DriverMarketMain.cpp $(EGEN)/WheelTime.cpp $(EGEN)/MEESecurity.cpp $(EGEN)/MEEPriceBoard.cpp $(EGEN)/MEETickerTape.cpp $(EGEN)/MEETradingFloor.cpp $(EGEN)/MEE.cpp $(INTF)/MEESUT.cpp Modified: trunk/dbt5/src/TestTransactions/TestTxn.cpp =================================================================== --- trunk/dbt5/src/TestTransactions/TestTxn.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/TestTransactions/TestTxn.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -4,6 +4,8 @@ #include <transactions.h> #include <CETxnInputGenerator.h> #include <cstdlib> +#include <TxnHarnessSendToMarketTest.h> +#include <DMSUTtest.h> using namespace TPCE; @@ -356,7 +358,7 @@ // Initialize DM - Data Maintenance class // DM is used by Data-Maintenance and Trade-Cleanup transactions - CDMSUTtest m_CDMSUT( &m_Conn ); // Data-Maintenance SUT interface (provided by us) + CDMSUTtest m_CDMSUT( &m_Conn ); // Data-Maintenance SUT interface (provided by us) CDM m_CDM( &m_CDMSUT, &log, inputFiles, iDefaultLoadUnitSize, iDefaultLoadUnitSize, 10, 500, 1 ); // provided by TPC Modified: trunk/dbt5/src/include/BrokerageHouse.h =================================================================== --- trunk/dbt5/src/include/BrokerageHouse.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/BrokerageHouse.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -33,6 +33,7 @@ INT32 RunBrokerVolume( PBrokerVolumeTxnInput pTxnInput, CBrokerVolume &BrokerVolume ); INT32 RunCustomerPosition( PCustomerPositionTxnInput pTxnInput, CCustomerPosition &CustomerPosition ); INT32 RunDataMaintenance( PDataMaintenanceTxnInput pTxnInput, CDataMaintenance &DataMaintenance ); + INT32 RunTradeCleanup( PTradeCleanupTxnInput pTxnInput, CTradeCleanup &TradeCleanup ); INT32 RunMarketWatch( PMarketWatchTxnInput pTxnInput, CMarketWatch &MarketWatch ); INT32 RunMarketFeed( PMarketFeedTxnInput pTxnInput, CMarketFeed &MarketFeed ); INT32 RunSecurityDetail( PSecurityDetailTxnInput pTxnInput, CSecurityDetail &SecurityDetail ); Modified: trunk/dbt5/src/include/CESUT.h =================================================================== --- trunk/dbt5/src/include/CESUT.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/CESUT.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -18,8 +18,8 @@ class CCESUT : public CCESUTInterface { int m_iBHlistenPort; - CSyncLock m_LogLock; - CSyncLock m_MixLock; + CSyncLock* m_pLogLock; + CSyncLock* m_pMixLock; ofstream* m_pfLog; // error log file ofstream* m_pfMix; // mix log file @@ -29,7 +29,7 @@ public: void LogErrorMessage(const string sErr); - CCESUT(const int iListenPort, ofstream* pflog, ofstream* pfmix); + CCESUT(const int iListenPort, ofstream* pflog, ofstream* pfmix, CSyncLock* pLogLock, CSyncLock* pMixLock); ~CCESUT(void); virtual bool BrokerVolume( PBrokerVolumeTxnInput pTxnInput ); // return whether it was successful Modified: trunk/dbt5/src/include/CommonStructs.h =================================================================== --- trunk/dbt5/src/include/CommonStructs.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/CommonStructs.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -45,6 +45,7 @@ TTradeStatusTxnInput TradeStatusTxnInput; TTradeUpdateTxnInput TradeUpdateTxnInput; TDataMaintenanceTxnInput DataMaintenanceTxnInput; + TTradeCleanupTxnInput TradeCleanupTxnInput; } TxnInput; } *PMsgDriverBrokerage; Modified: trunk/dbt5/src/include/DMSUT.h =================================================================== --- trunk/dbt5/src/include/DMSUT.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/DMSUT.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -4,7 +4,7 @@ * * 2006 Rilson Nascimento * - * 17 July 2006 + * 12 August 2006 */ #ifndef DM_SUT_H @@ -15,15 +15,23 @@ namespace TPCE { -class CDMSUTtest : public CDMSUTInterface +class CDMSUT : public CDMSUTInterface { -protected: - CDBConnection* m_pDBConnection; + int m_iBHlistenPort; + CSyncLock* m_pLogLock; + CSyncLock* m_pMixLock; + ofstream* m_pfLog; // error log file + ofstream* m_pfMix; // mix log file +private: + void ConnectRunTxnAndLog(PMsgDriverBrokerage pRequest); + public: - CDMSUTtest(CDBConnection *pDBConn); - ~CDMSUTtest(); + void LogErrorMessage(const string sErr); + CDMSUT(const int iListenPort, ofstream* pflog, ofstream* pfmix, CSyncLock* pLogLock, CSyncLock* pMixLock); + ~CDMSUT(void); + virtual bool DataMaintenance( PDataMaintenanceTxnInput pTxnInput ); // return whether it was successful virtual bool TradeCleanup( PTradeCleanupTxnInput pTxnInput ); // return whether it was successful }; Added: trunk/dbt5/src/include/DMSUTtest.h =================================================================== --- trunk/dbt5/src/include/DMSUTtest.h (rev 0) +++ trunk/dbt5/src/include/DMSUTtest.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -0,0 +1,33 @@ +/* + * DMSUT.h + * DM - SUT Interface class + * + * 2006 Rilson Nascimento + * + * 17 July 2006 + */ + +#ifndef DM_SUT_TEST_H +#define DM_SUT_TEST_H + +#include "DMSUTInterface.h" + +namespace TPCE +{ + +class CDMSUTtest : public CDMSUTInterface +{ +protected: + CDBConnection* m_pDBConnection; + +public: + CDMSUTtest(CDBConnection *pDBConn); + ~CDMSUTtest(); + + virtual bool DataMaintenance( PDataMaintenanceTxnInput pTxnInput ); // return whether it was successful + virtual bool TradeCleanup( PTradeCleanupTxnInput pTxnInput ); // return whether it was successful +}; + +} // namespace TPCE + +#endif // DM_SUT_TEST_H Modified: trunk/dbt5/src/include/DriverCustomer.h =================================================================== --- trunk/dbt5/src/include/DriverCustomer.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/DriverCustomer.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -19,9 +19,11 @@ int m_iPacingDelay; CLogFormatTab m_fmt; CEGenLogger* m_pLog; + CInputFiles m_InputFiles; CCESUT* m_pCCESUT; - CInputFiles m_InputFiles; CCE* m_pCCE; + CDMSUT* m_pCDMSUT; + CDM* m_pCDM; PDriverCETxnSettings m_pDriverCETxnSettings; CSyncLock m_LogLock; CSyncLock m_MixLock; @@ -34,13 +36,15 @@ friend void* TPCE::CustomerWorkerThread(void* data); friend void TPCE::EntryCustomerWorkerThread(void* data, int i); // entry point for driver worker thread + friend void* TPCE::DMWorkerThread(void* data); + friend void TPCE::EntryDMWorkerThread(CDriverCustomer* ptr); public: CDriverCustomer(char* szInDir, TIdent iConfiguredCustomerCount, TIdent iActiveCustomerCount, INT32 iScaleFactor, INT32 iDaysOfInitialTrades, UINT32 UniqueId, int iBHlistenPort, int iUsers, int iPacingDelay); ~CDriverCustomer(); - void RunTest(int iSleep); + void RunTest(int iSleep, int iTestDuration); }; //parameter structure for the threads Modified: trunk/dbt5/src/include/dbt5error.h =================================================================== --- trunk/dbt5/src/include/dbt5error.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/dbt5error.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -6,12 +6,11 @@ * 07 August 2006 */ -//#include <transactions.h> - #ifndef DBT5_ERR_H #define DBT5_ERR_H #include <error.h> +#include <string> namespace TPCE { @@ -24,7 +23,7 @@ #define CE_MIX_LOG_NAME "ce_mix.log" #define MEE_MIX_LOG_NAME "mee_mix.log" -static string PGSQL_SERIALIZE_ERROR = "ERROR: could not serialize access due to concurrent update"; +static std::string PGSQL_SERIALIZE_ERROR = "ERROR: could not serialize access due to concurrent update"; class CSocketErr : public CBaseErr { Modified: trunk/dbt5/src/include/transactions.h =================================================================== --- trunk/dbt5/src/include/transactions.h 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/include/transactions.h 2006-08-13 04:46:59 UTC (rev 1451) @@ -25,7 +25,6 @@ #include <CommonStructs.h> #include <SyncLockInterface.h> #include <TxnHarnessSendToMarketInterface.h> -#include <TxnHarnessSendToMarketTest.h> #include <TxnHarnessSendToMarket.h> #include <DBConnection.h> @@ -44,6 +43,7 @@ #include <DM.h> #include <DMSUT.h> #include <DataMaintenanceDB.h> +#include <TradeCleanupDB.h> #include <MEE.h> #include <MEESUT.h> Modified: trunk/dbt5/src/interfaces/CESUT.cpp =================================================================== --- trunk/dbt5/src/interfaces/CESUT.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/interfaces/CESUT.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -13,8 +13,10 @@ using namespace TPCE; // Constructor -CCESUT::CCESUT(const int iListenPort, ofstream* pflog, ofstream* pfmix) +CCESUT::CCESUT(const int iListenPort, ofstream* pflog, ofstream* pfmix, CSyncLock* pLogLock, CSyncLock* pMixLock) : m_iBHlistenPort(iListenPort), + m_pLogLock(pLogLock), + m_pMixLock(pMixLock), m_pfLog(pflog), m_pfMix(pfmix) { @@ -64,7 +66,7 @@ // ERR_TYPE_WRONGTXN // logging - m_MixLock.ClaimLock(); + m_pMixLock->ClaimLock(); if (Reply.iStatus == CBaseTxnErr::SUCCESS) { *(m_pfMix)<<(int)time(NULL)<<","<<pRequest->TxnType<< ","<<(TxnTime.MSec()/1000.0)<<","<<(int)pthread_self()<<endl; @@ -78,7 +80,7 @@ *(m_pfMix)<<(int)time(NULL)<<","<<"E"<<","<<(TxnTime.MSec()/1000.0)<<","<<(int)pthread_self()<<endl; } m_pfMix->flush(); - m_MixLock.ReleaseLock(); + m_pMixLock->ReleaseLock(); } catch(CSocketErr *pErr) @@ -97,7 +99,7 @@ // Broker Volume bool CCESUT::BrokerVolume( PBrokerVolumeTxnInput pTxnInput ) { - cout<<"Broker Volume requested"<<endl; + //cout<<"Broker Volume requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -112,7 +114,7 @@ // Customer Position bool CCESUT::CustomerPosition( PCustomerPositionTxnInput pTxnInput ) { - cout<<"Customer Position requested"<<endl; + //cout<<"Customer Position requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -127,7 +129,7 @@ // Market Watch bool CCESUT::MarketWatch( PMarketWatchTxnInput pTxnInput ) { - cout<<"Market Watch requested"<<endl; + //cout<<"Market Watch requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -142,7 +144,7 @@ // Security Detail bool CCESUT::SecurityDetail( PSecurityDetailTxnInput pTxnInput ) { - cout<<"Security Detail requested"<<endl; + //cout<<"Security Detail requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -157,7 +159,7 @@ // Trade Lookup bool CCESUT::TradeLookup( PTradeLookupTxnInput pTxnInput ) { - cout<<"Trade Lookup requested"<<endl; + //cout<<"Trade Lookup requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -172,7 +174,7 @@ // Trade Status bool CCESUT::TradeStatus( PTradeStatusTxnInput pTxnInput ) { - cout<<"Trade Status requested"<<endl; + //cout<<"Trade Status requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -187,7 +189,7 @@ // Trade Order bool CCESUT::TradeOrder( PTradeOrderTxnInput pTxnInput, INT32 iTradeType, bool bExecutorIsAccountOwner ) { - cout<<"Trade Order requested"<<endl; + //cout<<"Trade Order requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -202,7 +204,7 @@ // Trade Update bool CCESUT::TradeUpdate( PTradeUpdateTxnInput pTxnInput ) { - cout<<"Trade Update requested"<<endl; + //cout<<"Trade Update requested"<<endl; TMsgDriverBrokerage Request; memset(&Request, 0, sizeof(TMsgDriverBrokerage)); @@ -217,9 +219,9 @@ // LogErrorMessage void CCESUT::LogErrorMessage( const string sErr ) { - m_LogLock.ClaimLock(); + m_pLogLock->ClaimLock(); cout<<sErr; *(m_pfLog)<<sErr; m_pfLog->flush(); - m_LogLock.ReleaseLock(); + m_pLogLock->ReleaseLock(); } Added: trunk/dbt5/src/interfaces/DMSUT.cpp =================================================================== --- trunk/dbt5/src/interfaces/DMSUT.cpp (rev 0) +++ trunk/dbt5/src/interfaces/DMSUT.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -0,0 +1,137 @@ +/* + * DMSUT.cpp + * + * 2006 Rilson Nascimento + * + * 12 August 2006 + */ + +#include <transactions.h> + +char* addr2 = "localhost"; + +using namespace TPCE; + +// constructor +CDMSUT::CDMSUT(const int iListenPort, ofstream* pflog, ofstream* pfmix, CSyncLock* pLogLock, CSyncLock* pMixLock) +: m_iBHlistenPort(iListenPort), + m_pLogLock(pLogLock), + m_pMixLock(pMixLock), + m_pfLog(pflog), + m_pfMix(pfmix) +{ +} + +// destructor +CDMSUT::~CDMSUT() +{ +} + +// Connect, send to, and receive reply from, Brokerage House & logging +void CDMSUT::ConnectRunTxnAndLog(PMsgDriverBrokerage pRequest) +{ + TMsgBrokerageDriver Reply; // reply message from BrokerageHouse + memset(&Reply, 0, sizeof(Reply)); + + CDateTime StartTime, EndTime, TxnTime; // to time the transaction + CSocket sock; + + try + { + // connect to BrokerageHouse + sock.Connect(addr2, m_iBHlistenPort); + + // record txn start time -- please, see TPC-E specification clause 6.2.1.3 + StartTime.SetToCurrent(); + + // send and wait for response + sock.Send(reinterpret_cast<void*>(pRequest), sizeof(TMsgDriverBrokerage)); + sock.Receive( reinterpret_cast<void*>(&Reply), sizeof(Reply)); + + // record txn end time + EndTime.SetToCurrent(); + + // close connection + sock.CloseAccSocket(); + + // calculate txn response time + TxnTime.Set(0); // clear time + TxnTime.Add(0, (int)((EndTime - StartTime) * MsPerSecond)); // add ms + + // Errors: + // CBaseTxnErr::SUCCESS + // CBaseTxnErr::ROLLBACK (trade-order) + // CBaseTxnErr::UNAUTHORIZED_EXECUTOR (trade-order) + // ERR_TYPE_PQXX + // ERR_TYPE_WRONGTXN + + // logging + m_pMixLock->ClaimLock(); + if (Reply.iStatus == CBaseTxnErr::SUCCESS) + { + *(m_pfMix)<<(int)time(NULL)<<","<<pRequest->TxnType<< ","<<(TxnTime.MSec()/1000.0)<<","<<(int)pthread_self()<<endl; + } + else if (Reply.iStatus == CBaseTxnErr::ROLLBACK) + { + *(m_pfMix)<<(int)time(NULL)<<","<<pRequest->TxnType<<"R"<<","<<(TxnTime.MSec()/1000.0)<<","<<(int)pthread_self()<<endl; + } + else + { + *(m_pfMix)<<(int)time(NULL)<<","<<"E"<<","<<(TxnTime.MSec()/1000.0)<<","<<(int)pthread_self()<<endl; + } + m_pfMix->flush(); + m_pMixLock->ReleaseLock(); + + } + catch(CSocketErr *pErr) + { + sock.CloseAccSocket(); // close connection + *(m_pfMix)<<(int)time(NULL)<<","<<"E"<<","<<"000"<<","<<(int)pthread_self()<<endl; + + ostringstream osErr; + osErr<<endl<<"Error: "<<pErr->ErrorText() + <<" at "<<"CCESUT::ConnectRunTxnAndLog"<<endl; + LogErrorMessage(osErr.str()); + delete pErr; + } +} + +// Data Maintenance +bool CDMSUT::DataMaintenance( PDataMaintenanceTxnInput pTxnInput ) +{ + cout<<"Data Maintenance requested"<<endl; + + TMsgDriverBrokerage Request; + memset(&Request, 0, sizeof(TMsgDriverBrokerage)); + + Request.TxnType = DATA_MAINTENANCE; + memcpy( &(Request.TxnInput.DataMaintenanceTxnInput), pTxnInput, sizeof( TDataMaintenanceTxnInput )); + + ConnectRunTxnAndLog(&Request); + return true; +} + +// Trade Cleanup +bool CDMSUT::TradeCleanup( PTradeCleanupTxnInput pTxnInput ) +{ + cout<<"Trade Cleanup requested"<<endl; + + TMsgDriverBrokerage Request; + memset(&Request, 0, sizeof(TMsgDriverBrokerage)); + + Request.TxnType = TRADE_CLEANUP; + memcpy( &(Request.TxnInput.TradeCleanupTxnInput), pTxnInput, sizeof( TTradeCleanupTxnInput )); + + ConnectRunTxnAndLog(&Request); + return true; +} + +// LogErrorMessage +void CDMSUT::LogErrorMessage( const string sErr ) +{ + m_pLogLock->ClaimLock(); + cout<<sErr; + *(m_pfLog)<<sErr; + m_pfLog->flush(); + m_pLogLock->ReleaseLock(); +} Modified: trunk/dbt5/src/interfaces/DMSUTtest.cpp =================================================================== --- trunk/dbt5/src/interfaces/DMSUTtest.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/interfaces/DMSUTtest.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -7,6 +7,7 @@ */ #include <transactions.h> +#include <DMSUTtest.h> using namespace TPCE; Modified: trunk/dbt5/src/interfaces/MEESUTtest.cpp =================================================================== --- trunk/dbt5/src/interfaces/MEESUTtest.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/interfaces/MEESUTtest.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -8,6 +8,7 @@ #include <transactions.h> #include <MEESUTtest.h> +#include <TxnHarnessSendToMarketTest.h> using namespace TPCE; Modified: trunk/dbt5/src/interfaces/TxnHarnessSendToMarketTest.cpp =================================================================== --- trunk/dbt5/src/interfaces/TxnHarnessSendToMarketTest.cpp 2006-08-12 06:45:41 UTC (rev 1450) +++ trunk/dbt5/src/interfaces/TxnHarnessSendToMarketTest.cpp 2006-08-13 04:46:59 UTC (rev 1451) @@ -8,6 +8,7 @@ #include <transactions.h> #include <MEESUTtest.h> +#include <TxnHarnessSendToMarketTest.h> using namespace TPCE; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |