From: Karel G. <kg...@us...> - 2001-11-24 19:45:29
|
Update of /cvsroot/micomt/mico/orb In directory usw-pr-cvs1:/tmp/cvs-serv31239/orb Modified Files: fast_array.cc iop.cc mt_dispatcher.cc mt_manager.cc orb.cc orb_all.cc os-thread.cc Log Message: - fixed compilation of final build - fixed --enable-threads configure option - fixed server scalability problem with thread-pool concurrency model which uses separate reader/writer threads for each GIOP connection Index: fast_array.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/fast_array.cc,v retrieving revision 1.13 retrieving revision 1.14 diff -C2 -r1.13 -r1.14 *** fast_array.cc 2001/10/26 20:00:00 1.13 --- fast_array.cc 2001/11/24 19:45:26 1.14 *************** *** 118,122 **** _first_free = n; } ! #ifdef __M_DEBUG objs[n].obj = NULL; objs[n].prev = 0; --- 118,122 ---- _first_free = n; } ! #ifdef MTDEBUG objs[n].obj = NULL; objs[n].prev = 0; *************** *** 124,128 **** } ! #ifdef __M_DEBUG void MICO::__void_array::print_stats() { if (MICO::Logger::IsLogged (MICO::Logger::Support)) { --- 124,128 ---- } ! #ifdef MTDEBUG void MICO::__void_array::print_stats() { if (MICO::Logger::IsLogged (MICO::Logger::Support)) { *************** *** 171,173 **** } ! #endif // __M_DEBUG --- 171,173 ---- } ! #endif // MTDEBUG Index: iop.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/iop.cc,v retrieving revision 1.72 retrieving revision 1.73 diff -C2 -r1.72 -r1.73 *** iop.cc 2001/10/26 20:00:00 1.72 --- iop.cc 2001/11/24 19:45:26 1.73 *************** *** 2133,2138 **** /******************************* InputHandler ***************************/ ! MICO::InputHandler::InputHandler(): PassiveOperation() { ! __NAME( name( "InputHandler" ) ); if (MICO::Logger::IsLogged (MICO::Logger::GIOP)) { --- 2133,2138 ---- /******************************* InputHandler ***************************/ ! MICO::InputHandler::InputHandler(): PassiveOperation() ! { __NAME( name( "InputHandler" ) ); if (MICO::Logger::IsLogged (MICO::Logger::GIOP)) { *************** *** 2184,2193 **** /******************************* GIOPConn *******************************/ - MICO::GIOPConn::GIOPConn (CORBA::Dispatcher *disp, CORBA::Transport *transp, GIOPConnCallback *cb, GIOPCodec *codec, CORBA::Long tmout, CORBA::ULong max_size) ! : _inctx (codec, new CORBA::Buffer()) { _disp = disp; --- 2184,2202 ---- /******************************* GIOPConn *******************************/ + #ifdef HAVE_THREADS + MICO::GIOPConn::GIOPConn (CORBA::Dispatcher *disp, CORBA::Transport *transp, + GIOPConnCallback *cb, GIOPCodec *codec, + CORBA::Long tmout, CORBA::ULong max_size, + CORBA::Boolean __reader_thread, + CORBA::Boolean __writer_thread) + : _inctx (codec, new CORBA::Buffer()), _M_use_reader_thread(__reader_thread), + _M_use_writer_thread(__writer_thread) + #else MICO::GIOPConn::GIOPConn (CORBA::Dispatcher *disp, CORBA::Transport *transp, GIOPConnCallback *cb, GIOPCodec *codec, CORBA::Long tmout, CORBA::ULong max_size) ! : _inctx (codec, new CORBA::Buffer()) ! #endif { _disp = disp; *************** *** 2213,2218 **** #ifdef HAVE_THREADS ! _writer = new GIOPConnWriter( this ); ! _reader = new GIOPConnReader( this ); #endif --- 2222,2229 ---- #ifdef HAVE_THREADS ! if (_M_use_reader_thread) ! _reader = new GIOPConnReader(this); ! if (_M_use_writer_thread) ! _writer = new GIOPConnWriter(this); #endif *************** *** 2249,2254 **** //REMEMBER: terminate tells the thread to exit // it does NOT cancel the thread ! _writer->init_shutdown(); ! _reader->init_shutdown(); _transp->close(); --- 2260,2268 ---- //REMEMBER: terminate tells the thread to exit // it does NOT cancel the thread ! if (_M_use_writer_thread) ! _writer->init_shutdown(); ! if (_M_use_reader_thread) ! _reader->init_shutdown(); ! _transp->close(); *************** *** 2256,2264 **** _transp->wselect (_disp, 0); ! _writer->finalize_shutdown(); ! _reader->finalize_shutdown(); ! delete _writer; ! delete _reader; ! if (MICO::Logger::IsLogged (MICO::Logger::GIOP)) { MICOMT::AutoDebugLock __lock; --- 2270,2281 ---- _transp->wselect (_disp, 0); ! if (_M_use_writer_thread) { ! _writer->finalize_shutdown(); ! delete _writer; ! } ! if (_M_use_reader_thread) { ! _reader->finalize_shutdown(); ! delete _reader; ! } if (MICO::Logger::IsLogged (MICO::Logger::GIOP)) { MICOMT::AutoDebugLock __lock; *************** *** 2295,2300 **** this->ref(); this->state_change( MICOMT::StateRefCnt::Active ); ! _writer->start(); ! _reader->start(); } --- 2312,2319 ---- this->ref(); this->state_change( MICOMT::StateRefCnt::Active ); ! if (_M_use_writer_thread) ! _writer->start(); ! if (_M_use_reader_thread) ! _reader->start(); } *************** *** 2312,2316 **** << " conn: " << this << endl << " ev: GIOPConnCallback::InputReady" << endl ! << " t_mod: " << MICO::MTManager::get_thread_model() << endl << " pool: " << MICO::MTManager::thread_pool() << endl << " conn: " << MICO::MTManager::thread_per_connection() << endl --- 2331,2335 ---- << " conn: " << this << endl << " ev: GIOPConnCallback::InputReady" << endl ! << " t_mod: " << MICO::MTManager::concurrency_model() << endl << " pool: " << MICO::MTManager::thread_pool() << endl << " conn: " << MICO::MTManager::thread_per_connection() << endl *************** *** 2318,2327 **** } ! if (MICO::MTManager::thread_pool() ! || MICO::MTManager::thread_per_request()) { ! MICO::msg_type *msg; msg = new msg_type(new GIOPConnMsg(this, this->input(), GIOPConnCallback::InputReady)); - _reader->send_msg( MICO::Operation::DeCode, msg ); return TRUE; --- 2337,2351 ---- } ! if (MICO::MTManager::thread_pool()) { ! MICO::msg_type *msg; ! msg = new msg_type(new GIOPConnMsg(this, this->input(), ! GIOPConnCallback::InputReady)); ! _tpm->get_thread_pool(MICO::Operation::DeCode).put_msg(0, msg); ! return TRUE; ! } ! if (MICO::MTManager::thread_per_request()) { ! MICO::msg_type *msg; msg = new msg_type(new GIOPConnMsg(this, this->input(), GIOPConnCallback::InputReady)); _reader->send_msg( MICO::Operation::DeCode, msg ); return TRUE; *************** *** 2680,2685 **** if ( this->state() == MICOMT::StateRefCnt::Active ) { - - msg = new msg_type( b ); if (MICO::Logger::IsLogged (MICO::Logger::GIOP)) { --- 2704,2707 ---- *************** *** 2691,2698 **** if (MICO::MTManager::thread_pool()) { ! _writer->send_msg( msg ); } else { ! _writer->process(msg); } } else { --- 2713,2723 ---- if (MICO::MTManager::thread_pool()) { ! //_writer->send_msg( msg ); ! this->output_handler(b); } else { ! // msg = new msg_type( b ); ! // _writer->process(msg); ! this->output_handler(b); } } else { *************** *** 3034,3038 **** } else #endif // 1 ! { conn = (*i).second; #ifdef HAVE_THREADS --- 3059,3063 ---- } else #endif // 1 ! { conn = (*i).second; #ifdef HAVE_THREADS *************** *** 3087,3090 **** --- 3112,3125 ---- return 0; } + #ifdef HAVE_THREADS + CORBA::Boolean __use_reader_thread = TRUE; + CORBA::Boolean __use_writer_thread = FALSE; + conn = + new GIOPConn (Dispatcher(), t, this, + new GIOPCodec (new CDRDecoder, new CDREncoder, + version), + 0L /* no tmout */, _max_message_size, + __use_reader_thread, __use_writer_thread); + #else conn = new GIOPConn (Dispatcher(), t, this, *************** *** 3092,3095 **** --- 3127,3131 ---- version), 0L /* no tmout */, _max_message_size); + #endif _conns[t->peer()] = conn; #ifdef HAVE_THREADS *************** *** 3965,3976 **** } - #ifndef HAVE_THREADS CORBA::Dispatcher* MICO::IIOPProxy::Dispatcher() { return _orb->dispatcher(); } - #endif // HAVE_THREADS /************************* IIOPServerInvokeRec ************************/ --- 4001,4014 ---- } CORBA::Dispatcher* MICO::IIOPProxy::Dispatcher() { + #ifndef HAVE_THREADS return _orb->dispatcher(); + #else + return MICO::GIOPConnMgr::Dispatcher(); + #endif } /************************* IIOPServerInvokeRec ************************/ *************** *** 5135,5138 **** --- 5173,5182 ---- break; } + #ifdef HAVE_THREADS + CORBA::Boolean __use_reader_thread = TRUE; + CORBA::Boolean __use_writer_thread = FALSE; + if (MICO::MTManager::thread_pool()) + __use_reader_thread = FALSE; + GIOPConn *conn = new GIOPConn (Dispatcher(), t, this, *************** *** 5140,5145 **** new CDREncoder, _iiop_ver), 0L /* no tmout */, _max_message_size); ! // this and kill_conn are the only reasons to lock _conns --- 5184,5197 ---- new CDREncoder, _iiop_ver), + 0L /* no tmout */, _max_message_size, + __use_reader_thread, __use_writer_thread); + #else + GIOPConn *conn = + new GIOPConn (Dispatcher(), t, this, + new GIOPCodec (new CDRDecoder, + new CDREncoder, + _iiop_ver), 0L /* no tmout */, _max_message_size); ! #endif // this and kill_conn are the only reasons to lock _conns *************** *** 5164,5173 **** } - #ifndef HAVE_THREADS CORBA::Dispatcher* MICO::IIOPServer::Dispatcher() { return _orb->dispatcher(); } - #endif --- 5216,5232 ---- } CORBA::Dispatcher* MICO::IIOPServer::Dispatcher() { + #ifndef HAVE_THREADS return _orb->dispatcher(); + #else + if (MICO::MTManager::thread_pool()) { + return _orb->dispatcher(); + } + else { + return MICO::GIOPConnMgr::Dispatcher(); + } + #endif } Index: mt_dispatcher.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/mt_dispatcher.cc,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -r1.20 -r1.21 *** mt_dispatcher.cc 2001/10/26 20:00:01 1.20 --- mt_dispatcher.cc 2001/11/24 19:45:26 1.21 *************** *** 27,32 **** */ - #define MTDEBUG - #define MICO_CONF_IMR #include <CORBA-SMALL.h> --- 27,30 ---- Index: mt_manager.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/mt_manager.cc,v retrieving revision 1.41 retrieving revision 1.42 diff -C2 -r1.41 -r1.42 *** mt_manager.cc 2001/10/26 20:00:02 1.41 --- mt_manager.cc 2001/11/24 19:45:26 1.42 *************** *** 410,430 **** } ! CORBA::Long MICO::MTManager::_S_thread_model = MICO::MTManager::ThreadPool; void ! MICO::MTManager::set_thread_model(MICO::MTManager::ThreadModel __model) { if (MICO::Logger::IsLogged (MICO::Logger::Thread)) { MICOMT::AutoDebugLock __lock; MICO::Logger::Stream (MICO::Logger::Thread) ! << "Using " << __model << " as a thread model of whole orb." << endl; } ! _S_thread_model = __model; } CORBA::Long ! MICO::MTManager::get_thread_model() { ! return MICO::MTManager::_S_thread_model; } --- 410,430 ---- } ! CORBA::Long MICO::MTManager::_S_concurrency_model = MICO::MTManager::_S_thread_pool; void ! MICO::MTManager::concurrency_model(MICO::MTManager::ConcurrencyModel __model) { if (MICO::Logger::IsLogged (MICO::Logger::Thread)) { MICOMT::AutoDebugLock __lock; MICO::Logger::Stream (MICO::Logger::Thread) ! << "Using " << __model << " as a concurrency model of whole orb." << endl; } ! _S_concurrency_model = __model; } CORBA::Long ! MICO::MTManager::concurrency_model() { ! return MICO::MTManager::_S_concurrency_model; } *************** *** 432,436 **** MICO::MTManager::thread_pool() { ! return _S_thread_model == ThreadPool; } --- 432,436 ---- MICO::MTManager::thread_pool() { ! return _S_concurrency_model == _S_thread_pool; } *************** *** 438,442 **** MICO::MTManager::thread_per_connection() { ! return _S_thread_model == ThreadPerConnection; } --- 438,442 ---- MICO::MTManager::thread_per_connection() { ! return _S_concurrency_model == _S_thread_per_connection; } *************** *** 444,448 **** MICO::MTManager::thread_per_request() { ! return _S_thread_model == ThreadPerRequest; } --- 444,448 ---- MICO::MTManager::thread_per_request() { ! return _S_concurrency_model == _S_thread_per_request; } Index: orb.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/orb.cc,v retrieving revision 1.45 retrieving revision 1.46 diff -C2 -r1.45 -r1.46 *** orb.cc 2001/11/04 18:34:53 1.45 --- orb.cc 2001/11/24 19:45:26 1.46 *************** *** 2300,2312 **** #ifdef HAVE_THREADS if (rec && rec->active() ) { ! assert( rec->callback() ); ! return (rec->callback()->waitfor(this, id, CORBA::ORBCallback::Result, tmout)); } return TRUE; #else if (tmout == 0) { ! if (!rec || rec->completed()) { ! return TRUE; ! } } --- 2300,2312 ---- #ifdef HAVE_THREADS if (rec && rec->active() ) { ! assert( rec->callback() ); ! return (rec->callback()->waitfor(this, id, CORBA::ORBCallback::Result, tmout)); } return TRUE; #else if (tmout == 0) { ! if (!rec || rec->completed()) { ! return TRUE; ! } } *************** *** 2314,2325 **** while (42) { ! if (!rec || rec->completed()) ! return TRUE; ! if (t.done()) ! return FALSE; ! _disp->run (FALSE); ! rec = get_invoke (id); } ! #endif } --- 2314,2325 ---- while (42) { ! if (!rec || rec->completed()) ! return TRUE; ! if (t.done()) ! return FALSE; ! _disp->run (FALSE); ! rec = get_invoke (id); } ! #endif // HAVE_THREADS } *************** *** 2837,2849 **** orb_instance->resource_manager ().request_limit (RequestLimit); if (__thread_pool) { ! MICO::MTManager::set_thread_model(MICO::MTManager::ThreadPool); cerr << "Using thread-pool concurrency model." << endl; } if (__thread_per_connection) { ! MICO::MTManager::set_thread_model(MICO::MTManager::ThreadPerConnection); cerr << "Using thread-per-connection concurrency model." << endl; } if (__thread_per_request) { ! MICO::MTManager::set_thread_model(MICO::MTManager::ThreadPerRequest); cerr << "Using thread-per-request concurrency model." << endl; } --- 2837,2854 ---- orb_instance->resource_manager ().request_limit (RequestLimit); if (__thread_pool) { ! MICO::MTManager::concurrency_model(MICO::MTManager::_S_thread_pool); cerr << "Using thread-pool concurrency model." << endl; + // this is a hack because MICO::SelectDispatcher::_isblocking + // is private static and method block is not static + MICO::SelectDispatcher* __disp = new MICO::SelectDispatcher; + __disp->block(FALSE); + delete __disp; } if (__thread_per_connection) { ! MICO::MTManager::concurrency_model(MICO::MTManager::_S_thread_per_connection); cerr << "Using thread-per-connection concurrency model." << endl; } if (__thread_per_request) { ! MICO::MTManager::concurrency_model(MICO::MTManager::_S_thread_per_request); cerr << "Using thread-per-request concurrency model." << endl; } Index: orb_all.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/orb_all.cc,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -r1.1.1.1 -r1.2 *** orb_all.cc 1999/11/03 23:20:53 1.1.1.1 --- orb_all.cc 2001/11/24 19:45:26 1.2 *************** *** 18,21 **** --- 18,24 ---- #include "except.cc" #include "transport.cc" + #include "transport/tcp.cc" + #include "transport/udp.cc" + #include "transport/unix.cc" #include "dispatch.cc" #include "typecode.cc" *************** *** 43,44 **** --- 46,68 ---- #include "dynany.cc" #include "ssliop.cc" + + // address.cc and others define MICO_CONF_NO_POA + #undef MICO_CONF_NO_POA + #include "ir.cc" + #include "ir_base.cc" + #include "imr.cc" + + #include "time_base.cc" + #include "mtdebug.cc" + #include "fast_array.cc" + + #ifdef HAVE_THREADS + #include "os-thread.cc" + #include "Operation.cc" + #include "message.cc" + #include "mt_dispatcher.cc" + #include "mt_manager.cc" + #include "rt_corba.cc" + #include "rt_corba_impl.cc" + #include "rt_mico_impl.cc" + #endif Index: os-thread.cc =================================================================== RCS file: /cvsroot/micomt/mico/orb/os-thread.cc,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -r1.9 -r1.10 *** os-thread.cc 2001/10/26 20:00:02 1.9 --- os-thread.cc 2001/11/24 19:45:26 1.10 *************** *** 39,47 **** // - #define MICO_DEBUG_MODULE MICO::Logger::Thread - #define MICO_CONF_IMR - //#include <pthread.h> #include <CORBA-SMALL.h> #ifndef _WINDOWS --- 39,44 ---- |