[complement-svn] SF.net SVN: complement: [1707] trunk/complement/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2007-08-27 15:32:41
|
Revision: 1707 http://complement.svn.sourceforge.net/complement/?rev=1707&view=rev Author: complement Date: 2007-08-27 08:32:34 -0700 (Mon, 27 Aug 2007) Log Message: ----------- janus.h, janus.cc: hide methods, intended for internal protocol usage only; vtime.h, vtime.cc: LeaveGroup added; vshostmgr.h, vshostmgr.cc: trick with finalizer---event after that message queue free from VS_OUT_MEMBER events; ut/unit_test.cc, ut/vt_object.cc, ut/vt_handler.cc, ut/VTmess_core.cc, ut/vt_operations.cc, ut/vt_operations.h, ut/vt_dispatch.cc, ut/vt_remote.cc: move test suite to janus namespace; ut/unit_test.cc, ut/vt_remote.cc: test two groups and interprocess virtual synchrony; Modified Paths: -------------- trunk/complement/explore/include/janus/janus.h trunk/complement/explore/include/janus/vshostmgr.h trunk/complement/explore/include/janus/vtime.h trunk/complement/explore/lib/janus/ChangeLog trunk/complement/explore/lib/janus/janus.cc trunk/complement/explore/lib/janus/ut/VTmess_core.cc trunk/complement/explore/lib/janus/ut/unit_test.cc trunk/complement/explore/lib/janus/ut/vt_dispatch.cc trunk/complement/explore/lib/janus/ut/vt_handler.cc trunk/complement/explore/lib/janus/ut/vt_object.cc trunk/complement/explore/lib/janus/ut/vt_operations.cc trunk/complement/explore/lib/janus/ut/vt_operations.h trunk/complement/explore/lib/janus/ut/vt_remote.cc trunk/complement/explore/lib/janus/vshostmgr.cc trunk/complement/explore/lib/janus/vtime.cc Modified: trunk/complement/explore/include/janus/janus.h =================================================================== --- trunk/complement/explore/include/janus/janus.h 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/include/janus/janus.h 2007-08-27 15:32:34 UTC (rev 1707) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/23 12:36:32 ptr> +// -*- C++ -*- Time-stamp: <07/08/25 01:40:20 ptr> #ifndef __janus_h #define __janus_h @@ -41,14 +41,17 @@ #ifdef __USE_STLPORT_HASH typedef std::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; typedef std::hash_multimap<group_type, oid_type> gid_map_type; + typedef std::hash_set<stem::addr_type> addr_cache_t; #endif #ifdef __USE_STD_HASH typedef __gnu_cxx::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; typedef __gnu_cxx::hash_multimap<group_type, oid_type> gid_map_type; + typedef __gnu_cxx::hash_set<stem::addr_type> addr_cache_t; #endif #if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) typedef std::tr1::unordered_map<oid_type, detail::vtime_obj_rec> vt_map_type; typedef std::tr1::unordered_multimap<group_type, oid_type> gid_map_type; + typedef std::tr1::unordered_set<stem::addr_type> addr_cache_t; #endif public: @@ -92,18 +95,7 @@ ~Janus(); - void JaDispatch( const stem::Event_base<VSmess>& ); - - void VSNewMember( const stem::Event_base<VSsync_rq>& e ); - void VSNewRemoteMemberDirect( const stem::Event_base<VSsync_rq>& e ); - void VSNewRemoteMemberRevert( const stem::Event_base<VSsync_rq>& e ); - void JaSend( const stem::Event& e, group_type ); - void Subscribe( stem::addr_type, oid_type, group_type ); - void Unsubscribe( oid_type, group_type ); - void Unsubscribe( oid_type ); - void get_gvtime( group_type, stem::addr_type, gvtime_type& ); - void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); void settrf( unsigned f ); void unsettrf( unsigned f ); @@ -119,6 +111,12 @@ difference_type group_size( group_type ) const; private: + void Subscribe( stem::addr_type, const oid_type&, group_type ); + void Unsubscribe( const oid_type&, group_type ); + void Unsubscribe( const oid_type& ); + + void get_gvtime( group_type, stem::addr_type, gvtime_type& ); + void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); void check_and_send( detail::vtime_obj_rec&, const stem::Event_base<VSmess>& ); void check_and_send_delayed( detail::vtime_obj_rec& ); @@ -137,8 +135,18 @@ friend class VTHandler; friend class VSHostMgr; +#ifdef __FIT_EXAM + friend class vtime_operations; +#endif + private: + void JaDispatch( const stem::Event_base<VSmess>& ); + void VSNewMember( const stem::Event_base<VSsync_rq>& e ); + void VSNewRemoteMemberDirect( const stem::Event_base<VSsync_rq>& e ); + void VSNewRemoteMemberRevert( const stem::Event_base<VSsync_rq>& e ); + void VSOutMember( const stem::Event_base<VSsync_rq>& e ); + DECLARE_RESPONSE_TABLE( Janus, stem::EventHandler ); }; Modified: trunk/complement/explore/include/janus/vshostmgr.h =================================================================== --- trunk/complement/explore/include/janus/vshostmgr.h 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/include/janus/vshostmgr.h 2007-08-27 15:32:34 UTC (rev 1707) @@ -37,9 +37,15 @@ { private: // typedef std::list<stem::gaddr_type> vshost_container_t; -#ifdef __USE_STLPORT_TR1 +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) typedef std::tr1::unordered_set<stem::gaddr_type> vshost_container_t; #endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_set<stem::gaddr_type> vshost_container_t; +#endif +#ifdef __USE_STLPORT_HASH + typedef std::hash_set<stem::gaddr_type> vshost_container_t; +#endif public: typedef vshost_container_t::size_type size_type; @@ -64,7 +70,7 @@ return tmp; } - void Subscribe( stem::addr_type, oid_type, group_type ); + void Subscribe( stem::addr_type, group_type ); private: typedef std::list<stem::NetTransportMgr *> nmgr_container_t; @@ -74,6 +80,28 @@ nmgr_container_t _clients; srv_container_t _servers; + class Finalizer : + public stem::EventHandler + { + public: + Finalizer( const char *info ) : + EventHandler( info ) + { cnd.set( false ); } + + void wait() + { cnd.try_wait(); } + + private: + void final() + { cnd.set( true ); } + + xmt::condition cnd; + + DECLARE_RESPONSE_TABLE( Finalizer, stem::EventHandler ); + }; + + Finalizer finalizer; + // DECLARE_RESPONSE_TABLE( VSHostMgr, janus::VTHandler ); }; Modified: trunk/complement/explore/include/janus/vtime.h =================================================================== --- trunk/complement/explore/include/janus/vtime.h 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/include/janus/vtime.h 2007-08-27 15:32:34 UTC (rev 1707) @@ -367,6 +367,7 @@ void JaSend( const stem::Event& e ); void JoinGroup( group_type grp ); + void LeaveGroup( group_type grp ); virtual void VSNewMember( const stem::Event_base<VSsync_rq>& e ); virtual void VSOutMember( const stem::Event_base<VSsync_rq>& e ); @@ -381,6 +382,7 @@ { return _vtdsp; } protected: + void Unsubscribe(); void VSNewMember_data( const stem::Event_base<VSsync_rq>&, const std::string& data ); void get_gvtime( group_type g, gvtime_type& gvt ); @@ -398,6 +400,7 @@ #define VS_SYNC_TIME 0x303 #define VS_NEW_REMOTE_MEMBER 0x304 #define VS_NEW_MEMBER_RV 0x305 +#define VS_HOST_MGR_FINAL 0x306 #ifdef __USE_STLPORT_HASH # undef __USE_STLPORT_HASH Modified: trunk/complement/explore/lib/janus/ChangeLog =================================================================== --- trunk/complement/explore/lib/janus/ChangeLog 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ChangeLog 2007-08-27 15:32:34 UTC (rev 1707) @@ -1,3 +1,26 @@ +2007-08-27 Petr Ovtchenkov <pt...@is...> + + * janus.h, janus.cc: hide methods, intended for internal + protocol usage only; + + * vtime.h, vtime.cc: LeaveGroup added; + + * vshostmgr.h, vshostmgr.cc: trick with finalizer---event after + that message queue free from VS_OUT_MEMBER events; + + * ut/unit_test.cc: move test suite to janus namespace; + + * ut/vt_object.cc, ut/vt_handler.cc, ut/VTmess_core.cc: + idem; + + * ut/vt_operations.cc, ut/vt_operations.h, ut/vt_dispatch.cc: + idem; + + * ut/vt_remote.cc: idem; + + * ut/unit_test.cc, ut/vt_remote.cc: test two groups and + interprocess virtual synchrony; + 2007-08-23 Petr Ovtchenkov <pt...@is...> * janus.h, janus.cc: Janus code moved from vtime.h, vtime.cc; Modified: trunk/complement/explore/lib/janus/janus.cc =================================================================== --- trunk/complement/explore/lib/janus/janus.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/janus.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/23 12:36:32 ptr> +// -*- C++ -*- Time-stamp: <07/08/26 12:44:25 ptr> #include <janus/janus.h> #include <janus/vshostmgr.h> @@ -261,7 +261,7 @@ throw domain_error( "VT object not member of group" ); // Error: not group member } -void Janus::Subscribe( stem::addr_type addr, oid_type oid, group_type grp ) +void Janus::Subscribe( stem::addr_type addr, const oid_type& oid, group_type grp ) { // See comment on top of VTSend above xmt::recursive_scoped_lock lk( this->_theHistory_lock ); @@ -298,11 +298,11 @@ // cerr << "**** " << grp << " " << xmt::getpid() << endl; if ( /* (grp != vshosts_group) && */ (_hostmgr != 0) ) { - _hostmgr->Subscribe( addr, oid, grp ); + _hostmgr->Subscribe( addr, grp ); } } -void Janus::Unsubscribe( oid_type oid, group_type grp ) +void Janus::Unsubscribe( const oid_type& oid, group_type grp ) { // See comment on top of VTSend above xmt::recursive_scoped_lock lk( this->_theHistory_lock ); @@ -311,6 +311,8 @@ grmap.equal_range( grp ); vt_map_type::iterator i = vtmap.find( oid ); + addr_cache_t addr_cache; + while ( range.first != range.second ) { if ( range.first->second == oid ) { grmap.erase( range.first++ ); @@ -318,9 +320,28 @@ vt_map_type::iterator j = vtmap.find( range.first->second ); if ( j != vtmap.end() ) { stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); - ev.dest( j->second.stem_addr() ); + stem::addr_type addr = j->second.stem_addr(); + + // if address is foreign, send VS_OUT_MEMBER to foreign janus + if ( (addr & stem::extbit) != 0 ) { + gaddr_type ga = manager()->reflect( addr ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + } + addr = manager()->reflect( ga ); + // send only once, foreign janus will resend to other group members + // in the same process: + if ( addr_cache.find(addr) != addr_cache.end() ) { + ++range.first; + continue; + } else { + addr_cache.insert( addr ); + } + } + ev.dest( addr ); ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); ev.value().grp = grp; + Forward( ev ); #ifdef __FIT_VS_TRACE try { scoped_lock lk(_lock_tr); @@ -333,8 +354,6 @@ catch ( ... ) { } #endif // __FIT_VS_TRACE - - Forward( ev ); } ++range.first; } @@ -346,11 +365,12 @@ } } - if ( grp != vshosts_group ) { - } + // if ( /* (grp != vshosts_group) && */ (_hostmgr != 0) ) { + // _hostmgr->Unsubscribe( oid, grp ); + // } } -void Janus::Unsubscribe( oid_type oid ) +void Janus::Unsubscribe( const oid_type& oid ) { // See comment on top of JaSend above xmt::recursive_scoped_lock lk( this->_theHistory_lock ); @@ -364,6 +384,8 @@ pair<gid_map_type::iterator,gid_map_type::iterator> range = grmap.equal_range( *grp ); + addr_cache_t addr_cache; + while ( range.first != range.second ) { if ( range.first->second == oid ) { grmap.erase( range.first++ ); @@ -371,9 +393,28 @@ vt_map_type::iterator j = vtmap.find( range.first->second ); if ( j != vtmap.end() ) { stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); - ev.dest( j->second.stem_addr() ); + stem::addr_type addr = j->second.stem_addr(); + + // if address is foreign, send VS_OUT_MEMBER to foreign janus + if ( (addr & stem::extbit) != 0 ) { + gaddr_type ga = manager()->reflect( addr ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + } + addr = manager()->reflect( ga ); + // send only once, foreign janus will resend to other group members + // in the same process: + if ( addr_cache.find(addr) != addr_cache.end() ) { + ++range.first; + continue; + } else { + addr_cache.insert( addr ); + } + } + ev.dest( addr ); ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); ev.value().grp = *grp; + Forward( ev ); #ifdef __FIT_VS_TRACE try { scoped_lock lk(_lock_tr); @@ -386,8 +427,6 @@ catch ( ... ) { } #endif // __FIT_VS_TRACE - - Forward( ev ); } ++range.first; } @@ -504,7 +543,7 @@ if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { *_trs << " -> VS_NEW_MEMBER_RV G" << vshosts_group << " " << hex << showbase - << evr.src() << " -> " << evr.dest() << dec << endl; + << self_id() << " -> " << evr.dest() << dec << endl; } } catch ( ... ) { @@ -520,8 +559,7 @@ group_type grp = ev.value().grp; pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = grmap.equal_range( grp ); if ( range.first != range.second ) { // we have local? member within this group - gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr - addr_type addr = ev.src(); + const addr_type addr = ev.src(); gaddr_type ga = manager()->reflect( addr ); addr_type janus_addr = badaddr; if ( ga != gaddr_type() ) { @@ -571,6 +609,8 @@ } } + const gaddr_type oid = manager()->reflect( addr ); // ???? oid == gaddr + vtmap[oid].add( addr, grp ); grmap.insert( make_pair(grp,oid) ); // cerr << "**** " << grp << " " << xmt::getpid() << endl; @@ -584,8 +624,7 @@ group_type grp = ev.value().grp; pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = grmap.equal_range( grp ); if ( range.first != range.second ) { // we have local? member within this group - gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr - addr_type addr = ev.src(); + const addr_type addr = ev.src(); for ( ; range.first != range.second; ++range.first ) { vt_map_type::iterator i = vtmap.find( range.first->second ); if ( i != vtmap.end() && ((i->second.stem_addr() & stem::extbit) == 0 )) { @@ -609,13 +648,14 @@ } } + const gaddr_type oid = manager()->reflect( addr ); // ???? oid == gaddr vtmap[oid].add( addr, grp ); grmap.insert( make_pair(grp,oid) ); // cerr << "**** " << grp << " " << xmt::getpid() << endl; } } else { - gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr - addr_type addr = ev.src(); + const addr_type addr = ev.src(); + const gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr vtmap[oid].add( addr, vshosts_group ); grmap.insert( make_pair(static_cast<group_type>(vshosts_group),oid) ); @@ -623,6 +663,67 @@ } } +void Janus::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) +{ +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "<- VS_OUT_MEMBER G" << ev.value().grp + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + const gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr + const group_type grp = ev.value().grp; + + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + + vt_map_type::iterator i = vtmap.find( oid ); + + while ( range.first != range.second ) { + if ( range.first->second == oid ) { + grmap.erase( range.first++ ); + } else { + vt_map_type::iterator j = vtmap.find( range.first->second ); + if ( j != vtmap.end() ) { + stem::addr_type addr = j->second.stem_addr(); + + // send only to local addresses + if ( (addr & stem::extbit) == 0 ) { + ev.dest( addr ); + Forward( ev ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_OUT_MEMBER " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } + } + ++range.first; + } + } + + if ( i != vtmap.end() ) { + if ( i->second.rm_group( grp ) ) { // no groups more + vtmap.erase( i ); + } + } +} + void Janus::connect( const char *host, int port ) { _hostmgr->connect( host, port ); @@ -653,6 +754,7 @@ EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER, VSNewMember, VSsync_rq ) EV_Event_base_T_( ST_NULL, VS_NEW_REMOTE_MEMBER, VSNewRemoteMemberDirect, VSsync_rq ) EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER_RV, VSNewRemoteMemberRevert, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_OUT_MEMBER, VSOutMember, VSsync_rq ) END_RESPONSE_TABLE } // namespace janus Modified: trunk/complement/explore/lib/janus/ut/VTmess_core.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/VTmess_core.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/VTmess_core.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -5,7 +5,8 @@ #include <iostream> #include <janus/vtime.h> -using namespace janus; +namespace janus { + using namespace std; class VTM_handler : @@ -146,3 +147,6 @@ return EXAM_RESULT; } + +} // namespace janus + Modified: trunk/complement/explore/lib/janus/ut/unit_test.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/unit_test.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/unit_test.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -6,11 +6,13 @@ int EXAM_IMPL(vtime_test_suite) { + using namespace janus; + exam::test_suite::test_case_type tc[3]; exam::test_suite t( "virtual time operations" ); - vtime_operations vt_oper; + janus::vtime_operations vt_oper; t.add( &vtime_operations::vt_max, vt_oper, "Max", tc[1] = t.add( &vtime_operations::vt_add, vt_oper, "Additions", @@ -20,16 +22,17 @@ t.add( &vtime_operations::VTMess_core, vt_oper, "VTmess core transfer", tc[2] = t.add( &vtime_operations::gvt_add, vt_oper, "Group VT add", tc[1] ) ); - t.add( &vtime_operations::remote, vt_oper, "remote", - t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", - t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", - t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", - t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", - t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", - t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] )))))))))); + t.add( &vtime_operations::mgroups, vt_oper, "mgroups", + t.add( &vtime_operations::remote, vt_oper, "remote", + t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", + t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", + t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", + t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", + t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", + t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] )))))))))) ); return t.girdle(); } Modified: trunk/complement/explore/lib/janus/ut/vt_dispatch.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_dispatch.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_dispatch.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -7,7 +7,8 @@ #include <janus/vtime.h> #include <janus/janus.h> -using namespace janus; +namespace janus { + using namespace std; class Dummy : @@ -129,3 +130,6 @@ return EXAM_RESULT; } + +} // namespace janus + Modified: trunk/complement/explore/lib/janus/ut/vt_handler.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_handler.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_handler.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -7,7 +7,8 @@ #include <stem/EvManager.h> -using namespace janus; +namespace janus { + using namespace std; class VTDummy : @@ -353,3 +354,5 @@ return EXAM_RESULT; } + +} // namespace janus Modified: trunk/complement/explore/lib/janus/ut/vt_object.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_object.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_object.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -5,7 +5,8 @@ #include <iostream> #include <janus/vtime.h> -using namespace janus; +namespace janus { + using namespace std; int EXAM_IMPL(vtime_operations::vt_object) @@ -186,3 +187,5 @@ return EXAM_RESULT; } + +} // namespace janus Modified: trunk/complement/explore/lib/janus/ut/vt_operations.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_operations.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_operations.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -5,7 +5,8 @@ #include <iostream> #include <janus/vtime.h> -using namespace janus; +namespace janus { + using namespace std; int EXAM_IMPL(vtime_operations::vt_compare) @@ -319,3 +320,5 @@ return EXAM_RESULT; } + +} // namespace janus Modified: trunk/complement/explore/lib/janus/ut/vt_operations.h =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_operations.h 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_operations.h 2007-08-27 15:32:34 UTC (rev 1707) @@ -5,6 +5,8 @@ #include <exam/suite.h> +namespace janus { + struct vtime_operations { int EXAM_DECL(vt_compare); @@ -30,6 +32,9 @@ int EXAM_DECL(VTEntryIntoGroup3); int EXAM_DECL(remote); + int EXAM_DECL(mgroups); }; +} // namespace janus + #endif // __vt_operations_h Modified: trunk/complement/explore/lib/janus/ut/vt_remote.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_remote.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/ut/vt_remote.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/23 21:31:56 ptr> +// -*- C++ -*- Time-stamp: <07/08/26 12:54:05 ptr> #include "vt_operations.h" @@ -21,13 +21,16 @@ #include <list> #include <sstream> +namespace janus { + using namespace std; using namespace stem; using namespace xmt; -using namespace janus; -#define VS_DUMMY_MESS 0x1203 -#define VS_DUMMY_GREETING 0x1204 +#define VS_DUMMY_MESS 0x1203 +#define VS_DUMMY_GREETING 0x1204 +#define VS_DUMMY_GREETING2 0x1205 +#define VS_DUMMY_MESS2 0x1206 class YaRemote : public janus::VTHandler @@ -39,12 +42,16 @@ ~YaRemote(); void handler( const stem::Event& ); + void handler2( const stem::Event& ); void VSNewMember( const stem::Event_base<VSsync_rq>& ); void VSOutMember( const stem::Event_base<VSsync_rq>& ); void greeting(); + void greeting2(); void wait(); + void wait2(); + std::string msg; int count; int ocount; @@ -55,9 +62,17 @@ gr.set( false ); } + void wait_greeting2() + { + gr2.try_wait(); + gr2.set( false ); + } + private: xmt::condition cnd; + xmt::condition cnd2; xmt::condition gr; + xmt::condition gr2; DECLARE_RESPONSE_TABLE( YaRemote, janus::VTHandler ); }; @@ -68,7 +83,9 @@ ocount(0) { cnd.set( false ); + cnd2.set( false ); gr.set( false ); + gr2.set( false ); } YaRemote::YaRemote( stem::addr_type id, const char *info ) : @@ -77,7 +94,9 @@ ocount(0) { cnd.set( false ); + cnd2.set( false ); gr.set( false ); + gr2.set( false ); } YaRemote::YaRemote( const char *info ) : @@ -86,7 +105,9 @@ ocount(0) { cnd.set( false ); + cnd2.set( false ); gr.set( false ); + gr2.set( false ); } YaRemote::~YaRemote() @@ -101,6 +122,13 @@ cnd.set( true ); } +void YaRemote::handler2( const stem::Event& ev ) +{ + msg = ev.value(); + + cnd2.set( true ); +} + void YaRemote::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { // cerr << "Hello " << xmt::getpid() << endl; @@ -109,14 +137,19 @@ // VTNewMember_data( ev, "" ); VTHandler::VSNewMember( ev ); - stem::EventVoid gr_ev( VS_DUMMY_GREETING ); - gr_ev.dest( ev.src() ); - Send( gr_ev ); + if ( ev.value().grp == janus::vs_base::first_user_group ) { + stem::EventVoid gr_ev( VS_DUMMY_GREETING ); + gr_ev.dest( ev.src() ); + Send( gr_ev ); + } else if ( ev.value().grp == (janus::vs_base::first_user_group + 1) ) { + stem::EventVoid gr_ev( VS_DUMMY_GREETING2 ); + gr_ev.dest( ev.src() ); + Send( gr_ev ); + } } void YaRemote::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) { - cerr << "VSOutMember" << endl; ++ocount; } @@ -127,14 +160,28 @@ cnd.set( false ); } +void YaRemote::wait2() +{ + cnd2.try_wait(); + + cnd2.set( false ); +} + void YaRemote::greeting() { gr.set( true ); } +void YaRemote::greeting2() +{ + gr2.set( true ); +} + DEFINE_RESPONSE_TABLE( YaRemote ) EV_EDS( ST_NULL, VS_DUMMY_MESS, handler ) + EV_EDS( ST_NULL, VS_DUMMY_MESS2, handler2 ) EV_VOID( ST_NULL, VS_DUMMY_GREETING, greeting ) + EV_VOID( ST_NULL, VS_DUMMY_GREETING2, greeting2 ) END_RESPONSE_TABLE int EXAM_IMPL(vtime_operations::remote) @@ -154,50 +201,60 @@ b.wait(); { - YaRemote obj1( "obj client" ); + VTHandler obj0; // this need to keep VSHostMgr after YaRemote exit + // to check exit from group with another process + { + YaRemote obj1( "obj client" ); - // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); - // obj1.manager()->settrs( &std::cerr ); + // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); + // obj1.manager()->settrs( &std::cerr ); - // obj1.vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); - // obj1.vtdispatcher()->settrs( &std::cerr ); + // obj1.vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); + // obj1.vtdispatcher()->settrs( &std::cerr ); - EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 1 ); + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 1 ); - obj1.vtdispatcher()->connect( "localhost", 6980 ); + obj1.vtdispatcher()->connect( "localhost", 6980 ); - // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; + // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; - while ( obj1.vtdispatcher()->vs_known_processes() < 2 ) { - xmt::Thread::yield(); - xmt::delay( xmt::timespec( 0, 1000000 ) ); - } + while ( obj1.vtdispatcher()->vs_known_processes() < 2 ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 1000000 ) ); + } - /*******************************************************************\ - * This variant is wrong, because of group_size don't guarantee - * that information in the object is relevant (i.e. VSsync happens); - * for example, in case below group_size already 2, but no janus string - * stored yet. + /*******************************************************************\ + * This variant is wrong, because of group_size don't guarantee + * that information in the object is relevant (i.e. VSsync happens); + * for example, in case below group_size already 2, but no janus string + * stored yet. - while ( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) < 2 ) { - xmt::Thread::yield(); - xmt::delay( xmt::timespec( 0, 1000000 ) ); - } - \********************************************************************/ + while ( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) < 2 ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 1000000 ) ); + } + \********************************************************************/ - // cerr << obj1.vtdispatcher()->vs_known_processes() << endl; + // cerr << obj1.vtdispatcher()->vs_known_processes() << endl; - EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 2 ); - // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 2 ); + // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; - obj1.JoinGroup( janus::vs_base::first_user_group ); + obj1.JoinGroup( janus::vs_base::first_user_group ); - obj1.wait_greeting(); + obj1.wait_greeting(); - EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); - EXAM_CHECK_ASYNC( obj1.count == 1 ); - // cerr << "* " << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) << endl; - obj1.wait(); + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); + EXAM_CHECK_ASYNC( obj1.count == 1 ); + // cerr << "* " << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) << endl; + obj1.wait(); + + // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); + // obj1.manager()->settrs( &std::cerr ); + } + // obj1 here away, but in another process (remote) still exist object in + // first_user_group, that's why 1 here: + EXAM_CHECK_ASYNC( obj0.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 1); } exit(0); @@ -226,15 +283,19 @@ obj1.JaSend( ev ); - EXAM_CHECK( obj1.count == 1 ); + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); + EXAM_CHECK( obj1.count == 1 ); + // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); + // obj1.manager()->settrs( &std::cerr ); + int stat; EXAM_CHECK( waitpid( child.pid(), &stat, 0 ) == child.pid() ); - // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) << endl; - // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 1 ); + + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 1 ); // cerr << obj1.vtdispatcher()->vs_known_processes() << endl; - // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) << endl; } (&b)->~__barrier<true>(); @@ -250,3 +311,109 @@ return EXAM_RESULT; } +int EXAM_IMPL(vtime_operations::mgroups) +{ + const char fname[] = "/tmp/yanus_test.shm"; + xmt::shm_alloc<0> seg; + xmt::allocator_shm<xmt::__condition<true>,0> shm_cnd; + xmt::allocator_shm<xmt::__barrier<true>,0> shm_b; + + try { + seg.allocate( fname, 4*4096, xmt::shm_base::create | xmt::shm_base::exclusive, 0600 ); + xmt::__barrier<true>& b = *new ( shm_b.allocate( 1 ) ) xmt::__barrier<true>(); + + try { + xmt::fork(); + + b.wait(); + + { + VTHandler obj0; // this need to keep VSHostMgr after YaRemote exit + // to check exit from group with another process + { + YaRemote obj1( "obj client" ); + + obj1.vtdispatcher()->connect( "localhost", 6980 ); + + while ( obj1.vtdispatcher()->vs_known_processes() < 2 ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 1000000 ) ); + } + + obj1.JoinGroup( janus::vs_base::first_user_group ); + obj1.JoinGroup( janus::vs_base::first_user_group + 1); + + obj1.wait_greeting(); + obj1.wait_greeting2(); + + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group + 1) == 2 ); + EXAM_CHECK_ASYNC( obj1.count == 2 ); + + stem::Event ev( VS_DUMMY_MESS2 ); + ev.dest( janus::vs_base::first_user_group + 1 ); + ev.value() = "hello"; + + obj1.JaSend( ev ); + + obj1.wait(); + + // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); + // obj1.manager()->settrs( &std::cerr ); + } + + EXAM_CHECK_ASYNC( obj0.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 1); + EXAM_CHECK_ASYNC( obj0.vtdispatcher()->group_size(janus::vs_base::first_user_group + 1) == 1); + } + + exit(0); + } + catch ( xmt::fork_in_parent& child ) { + YaRemote obj1( "obj srv" ); + + obj1.vtdispatcher()->serve( 6980 ); + + obj1.JoinGroup( janus::vs_base::first_user_group ); + obj1.JoinGroup( janus::vs_base::first_user_group + 1); + + b.wait(); + + obj1.wait_greeting(); + obj1.wait_greeting2(); + + stem::Event ev( VS_DUMMY_MESS ); + ev.dest( janus::vs_base::first_user_group ); + ev.value() = "hello"; + + obj1.JaSend( ev ); + + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group + 1) == 2 ); + EXAM_CHECK( obj1.count == 2 ); + + obj1.wait2(); + + int stat; + EXAM_CHECK( waitpid( child.pid(), &stat, 0 ) == child.pid() ); + + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 1 ); + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group + 1) == 1 ); + // cerr << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group + 1) << endl; + + EXAM_CHECK( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 1 ); + } + + (&b)->~__barrier<true>(); + shm_b.deallocate( &b, 1 ); + } + catch ( const xmt::shm_bad_alloc& err ) { + EXAM_ERROR( err.what() ); + } + + seg.deallocate(); + unlink( fname ); + + return EXAM_RESULT; +} + +} // namespace janus Modified: trunk/complement/explore/lib/janus/vshostmgr.cc =================================================================== --- trunk/complement/explore/lib/janus/vshostmgr.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/vshostmgr.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/23 11:24:18 ptr> +// -*- C++ -*- Time-stamp: <07/08/26 12:56:20 ptr> #include <janus/vshostmgr.h> @@ -25,21 +25,24 @@ using namespace janus; VSHostMgr::VSHostMgr() : - VTHandler() + VTHandler(), + finalizer( "vshostmgr aux" ) { vshost.insert( gaddr_type( stem::janus_addr ) ); JoinGroup( vshosts_group ); } VSHostMgr::VSHostMgr( stem::addr_type id, const char *info ) : - VTHandler( id, info ) + VTHandler( id, info ), + finalizer( "vshostmgr aux" ) { vshost.insert( gaddr_type( stem::janus_addr ) ); JoinGroup( vshosts_group ); } VSHostMgr::VSHostMgr( const char *info ) : - VTHandler( info ) + VTHandler( info ), + finalizer( "vshostmgr aux" ) { // vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); @@ -49,18 +52,38 @@ VSHostMgr::~VSHostMgr() { - while ( !_clients.empty() ) { - _clients.front()->close(); - _clients.front()->join(); - delete _clients.front(); - _clients.pop_front(); + if ( !_clients.empty() || !_servers.empty() ) { + VTHandler::Unsubscribe(); // before channels closed! + + stem::EventVoid ev( VS_HOST_MGR_FINAL ); + + ev.dest( finalizer.self_id() ); + Send( ev ); + + finalizer.wait(); + + // give the chance to deliver VS_OUT_MEMBER message to remotes... + // Do you know better solution, because this one is ugly? + for ( int i = 0; i < 5; ++i ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 100000 ) ); + } + + // shutdown clients... + while ( !_clients.empty() ) { + _clients.front()->close(); + _clients.front()->join(); + delete _clients.front(); + _clients.pop_front(); + } + // ... and servers + while ( !_servers.empty() ) { + _servers.front()->close(); + _servers.front()->wait(); + delete _servers.front(); + _servers.pop_front(); + } } - while ( !_servers.empty() ) { - _servers.front()->close(); - _servers.front()->wait(); - delete _servers.front(); - _servers.pop_front(); - } } void VSHostMgr::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) @@ -180,7 +203,7 @@ #endif // __FIT_VS_TRACE } -void VSHostMgr::Subscribe( stem::addr_type addr, oid_type oid, group_type grp ) +void VSHostMgr::Subscribe( stem::addr_type addr, group_type grp ) { try { manager()->transport( addr ); @@ -199,7 +222,7 @@ #ifdef __FIT_VS_TRACE try { scoped_lock lk(vtdispatcher()->_lock_tr); - if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracenet) ) { + if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracegroup) ) { *vtdispatcher()->_trs << " -> VS_NEW_REMOTE_MEMBER G" << grp << " " << hex << showbase << ev.src() << " -> " << ev.dest() << dec << endl; @@ -218,5 +241,9 @@ // EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER_RV, VSNewRemoteMemberRevert, VSsync_rq ) // END_RESPONSE_TABLE +DEFINE_RESPONSE_TABLE( VSHostMgr::Finalizer ) + EV_VOID( ST_NULL, VS_HOST_MGR_FINAL, final ) +END_RESPONSE_TABLE + } // namespace janus Modified: trunk/complement/explore/lib/janus/vtime.cc =================================================================== --- trunk/complement/explore/lib/janus/vtime.cc 2007-08-27 15:10:53 UTC (rev 1706) +++ trunk/complement/explore/lib/janus/vtime.cc 2007-08-27 15:32:34 UTC (rev 1707) @@ -621,16 +621,25 @@ VTHandler::~VTHandler() { - _vtdsp->Unsubscribe( oid_type( self_id() ) ); + Unsubscribe(); ((Init *)Init_buf)->~Init(); } +void VTHandler::Unsubscribe() +{ + _vtdsp->Unsubscribe( oid_type( self_id() ) ); +} + void VTHandler::JoinGroup( group_type grp ) { _vtdsp->Subscribe( self_id(), oid_type( self_id() ), grp ); } +void VTHandler::LeaveGroup( group_type grp ) +{ + _vtdsp->Unsubscribe( oid_type( self_id() ), grp ); +} void VTHandler::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |