[IBPP-DISCUSS] thread problem (thread newbie)
IBPP is a C++ client class library for FirebirdSQL
Status: Inactive
Brought to you by:
epocman
From: Sudheer P. <sud...@gm...> - 2006-11-04 04:53:29
|
Hi Friends, I am new to thread programming, in the following piece of code, a crash is occuring at *threadfunc, please help me to rectify the error. Thank you. Sudheer IBPP::Database dbH; void *threadfunc( void *myarg ) { IBPP::Transaction trnH = IBPP::TransactionFactory( dbH, IBPP::amWrite, IBPP::ilReadCommitted, IBPP::lrWait ); trnH->Start(); IBPP::Statement stmH = IBPP::StatementFactory( dbH, trnH ); string lsID; char lcID[10]; wnode *mywork; cnode *mynode; mynode=( cnode * ) myarg; pthread_mutex_lock( &sale_wq.control.mutex ); while ( sale_wq.control.active ) { while ( sale_wq.work.head == NULL && sale_wq.control.active ) { pthread_cond_wait( &sale_wq.control.cond, &sale_wq.control.mutex ); } if ( !sale_wq.control.active ) break; //we got something! mywork = (wnode *) queue_get( &sale_wq.work ); pthread_mutex_unlock( &sale_wq.control.mutex ); try { sprintf( lcID, "%i", mywork->jobnum ); lsID = (string)lcID; stmH->ExecuteImmediate( "UPDATE temp SET processed = 0 WHERE id = " + lsID + " " ); trnH->Commit(); printf( "Thread number %d processing job %d\n", mynode->threadnum, mywork->jobnum ); } catch ( IBPP::Exception& e ) { const char *lcErrMsg = e.ErrorMessage(); trnH->Rollback(); printf( lcErrMsg ); printf( "\n" ); } printf( "Thread number %d processing job %d\n", mynode->threadnum, mywork->jobnum ); delete mywork; pthread_mutex_lock( &sale_wq.control.mutex ); } pthread_mutex_unlock( &sale_wq.control.mutex ); pthread_mutex_lock( &cq.control.mutex ); queue_put( &cq.cleanup,(node *) mynode ); pthread_mutex_unlock( &cq.control.mutex ); pthread_cond_signal( &cq.control.cond ); printf( "thread %d shutting down...\n", mynode->threadnum ); return NULL; } #define NUM_WORKERS 4 int numthreads; void join_threads( void ) { cnode *curnode; while ( numthreads ) { pthread_mutex_lock( &cq.control.mutex ); while ( cq.cleanup.head == NULL ) { pthread_cond_wait( &cq.control.cond, &cq.control.mutex ); } curnode = (cnode *)queue_get( &cq.cleanup ); pthread_mutex_unlock( &cq.control.mutex ); pthread_join( curnode->tid, NULL ); printf( "joined with thread %d\n", curnode->threadnum ); delete curnode; numthreads--; } } int create_threads( void ) { int x; cnode *curnode; for ( x = 0; x < NUM_WORKERS; x++ ) { curnode = new cnode; if ( !curnode ) return 1; curnode->threadnum = x; if ( pthread_create( &curnode->tid, NULL, threadfunc, (void *) curnode ) ) return 1; printf( "created thread %d\n", x ); numthreads++; } return 0; } int main( void ) { try { dbH = IBPP::DatabaseFactory( gsServer, gcDb, gsUser, gsPwd ); dbH->Connect(); } catch ( IBPP::Exception& e ) { const char *lcErrMsg = e.ErrorMessage(); printf( lcErrMsg ); return 0; } int x; wnode *mywork; initialize_structs(); if ( create_threads() ) { printf( "Error starting threads... cleaning up.\n" ); join_threads(); dabort(); } int liID; short lusProcessed; double ldAmount; IBPP::Transaction trnH = IBPP::TransactionFactory( dbH, IBPP::amWrite, IBPP::ilReadCommitted, IBPP::lrWait ); trnH->Start(); IBPP::Statement stmH = IBPP::StatementFactory( dbH, trnH ); try { stmH->Prepare( "SELECT id, amount, processed FROM temp" ); stmH->Execute(); pthread_mutex_lock( &sale_wq.control.mutex ); while ( stmH->Fetch() ) { stmH->Get( 1, liID ); stmH->Get( 2, ldAmount ); stmH->Get( 3, lusProcessed ); mywork = new wnode; if ( !mywork ) { printf( "Can't malloc!\n" ); break; } mywork->jobnum = liID; queue_put( &sale_wq.work, (node *) mywork ); } trnH->Commit(); pthread_mutex_unlock( &sale_wq.control.mutex ); pthread_cond_broadcast( &sale_wq.control.cond ); Sleep( 2 ); } catch ( IBPP::Exception& e ) { const char *lcErrMsg = e.ErrorMessage(); trnH->Rollback(); printf( lcErrMsg ); printf( "\n" ); } printf( "deactivating work queue...\n" ); /* CLEANUP */ join_threads(); } |