[complement-svn] SF.net SVN: complement: [1702] trunk/complement/explore
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2007-08-23 09:02:29
|
Revision: 1702 http://complement.svn.sourceforge.net/complement/?rev=1702&view=rev Author: complement Date: 2007-08-23 02:02:15 -0700 (Thu, 23 Aug 2007) Log Message: ----------- Janus code moved from vtime.h, vtime.cc to janus.h, janus.cc; libjanus: version 0.3.0; * vshostmgr.h, vshostmgr.cc: object for management virtual syncrony processes; * vshostmgr.h, vshostmgr.cc, janus.h, janus.cc, vtime.h, vtime.cc: procedure of entry into group with remote virtual synchrony objects; * ut/vt_remote.cc: test for interprocess virtual synchrony, entry into group. Modified Paths: -------------- trunk/complement/explore/include/janus/vtime.h trunk/complement/explore/lib/janus/ChangeLog trunk/complement/explore/lib/janus/Makefile.inc trunk/complement/explore/lib/janus/ut/Makefile.inc trunk/complement/explore/lib/janus/ut/VTmess_core.cc trunk/complement/explore/lib/janus/ut/vt_dispatch.cc trunk/complement/explore/lib/janus/ut/vt_handler.cc trunk/complement/explore/lib/janus/ut/vt_object.cc trunk/complement/explore/lib/janus/ut/vt_remote.cc trunk/complement/explore/lib/janus/vtime.cc Added Paths: ----------- trunk/complement/explore/include/janus/janus.h trunk/complement/explore/include/janus/vshostmgr.h trunk/complement/explore/lib/janus/janus.cc trunk/complement/explore/lib/janus/vshostmgr.cc Added: trunk/complement/explore/include/janus/janus.h =================================================================== --- trunk/complement/explore/include/janus/janus.h (rev 0) +++ trunk/complement/explore/include/janus/janus.h 2007-08-23 09:02:15 UTC (rev 1702) @@ -0,0 +1,160 @@ +// -*- C++ -*- Time-stamp: <07/08/23 12:36:32 ptr> + +#ifndef __janus_h +#define __janus_h + +#include <ostream> +#include <iterator> + +#include <mt/xmt.h> +#include <stem/Event.h> +#include <stem/EventHandler.h> + +#include <janus/vtime.h> + +#ifdef STLPORT +# include <unordered_map> +# include <unordered_set> +// # include <hash_map> +// # include <hash_set> +// # define __USE_STLPORT_HASH +# define __USE_STLPORT_TR1 +#else +# if defined(__GNUC__) && (__GNUC__ < 4) +# include <ext/hash_map> +# include <ext/hash_set> +# define __USE_STD_HASH +# else +# include <tr1/unordered_map> +# include <tr1/unordered_set> +# define __USE_STD_TR1 +# endif +#endif + +namespace janus { + +class Janus : + public stem::EventHandler, + public vs_base +{ + private: +#ifdef __USE_STLPORT_HASH + typedef std::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; + typedef std::hash_multimap<group_type, oid_type> gid_map_type; +#endif +#ifdef __USE_STD_HASH + typedef __gnu_cxx::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; + typedef __gnu_cxx::hash_multimap<group_type, oid_type> gid_map_type; +#endif +#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) + typedef std::tr1::unordered_map<oid_type, detail::vtime_obj_rec> vt_map_type; + typedef std::tr1::unordered_multimap<group_type, oid_type> gid_map_type; +#endif + + public: + typedef std::iterator_traits<gid_map_type::iterator>::difference_type difference_type; + + enum traceflags { + notrace = 0, + tracenet = 1, + tracedispatch = 2, + tracefault = 4, + tracedelayed = 8, + tracegroup = 0x10 + }; + + Janus() : + _trflags( notrace ), + _trs( 0 ), + _hostmgr( 0 ) + { } + + Janus( const char *info ) : + stem::EventHandler( info ), + _trflags( notrace ), + _trs( 0 ), + _hostmgr( 0 ) + { } + + Janus( stem::addr_type id ) : + stem::EventHandler( id ), + _trflags( notrace ), + _trs( 0 ), + _hostmgr( 0 ) + { } + + Janus( stem::addr_type id, const char *info ) : + stem::EventHandler( id, info ), + _trflags( notrace ), + _trs( 0 ), + _hostmgr( 0 ) + { } + + ~Janus(); + + void JaDispatch( const stem::Event_base<VSmess>& ); + + void VSNewMember( const stem::Event_base<VSsync_rq>& e ); + void VSNewRemoteMemberDirect( const stem::Event_base<VSsync_rq>& e ); + void VSNewRemoteMemberRevert( const stem::Event_base<VSsync_rq>& e ); + + void JaSend( const stem::Event& e, group_type ); + void Subscribe( stem::addr_type, oid_type, group_type ); + void Unsubscribe( oid_type, group_type ); + void Unsubscribe( oid_type ); + void get_gvtime( group_type, stem::addr_type, gvtime_type& ); + void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); + + void settrf( unsigned f ); + void unsettrf( unsigned f ); + void resettrf( unsigned f ); + void cleantrf(); + unsigned trflags() const; + void settrs( std::ostream * ); + + void connect( const char *, int ); + void serve( int ); + size_t vs_known_processes() const; + + difference_type group_size( group_type ) const; + + private: + void check_and_send( detail::vtime_obj_rec&, const stem::Event_base<VSmess>& ); + void check_and_send_delayed( detail::vtime_obj_rec& ); + + vt_map_type vtmap; + gid_map_type grmap; + + xmt::mutex _lock_tr; + unsigned _trflags; + std::ostream *_trs; + + protected: + + class VSHostMgr *_hostmgr; + + friend class VTHandler::Init; + friend class VTHandler; + friend class VSHostMgr; + + private: + + DECLARE_RESPONSE_TABLE( Janus, stem::EventHandler ); +}; + +} // namespace janus + +#ifdef __USE_STLPORT_HASH +# undef __USE_STLPORT_HASH +#endif +#ifdef __USE_STD_HASH +# undef __USE_STD_HASH +#endif +#ifdef __USE_STLPORT_TR1 +# undef __USE_STLPORT_TR1 +#endif +#ifdef __USE_STD_TR1 +# undef __USE_STD_TR1 +#endif + +#endif // __janus_h Added: trunk/complement/explore/include/janus/vshostmgr.h =================================================================== --- trunk/complement/explore/include/janus/vshostmgr.h (rev 0) +++ trunk/complement/explore/include/janus/vshostmgr.h 2007-08-23 09:02:15 UTC (rev 1702) @@ -0,0 +1,82 @@ +// -*- C++ -*- Time-stamp: <07/08/23 12:36:32 ptr> + +#ifndef __vshostmgr_h +#define __vshostmgr_h + +#include <janus/vtime.h> + +#include <stem/NetTransport.h> +#include <stem/Event.h> +#include <sockios/sockmgr.h> + +#include <list> + +#ifdef STLPORT +# include <unordered_map> +# include <unordered_set> +// # include <hash_map> +// # include <hash_set> +// # define __USE_STLPORT_HASH +# define __USE_STLPORT_TR1 +#else +# if defined(__GNUC__) && (__GNUC__ < 4) +# include <ext/hash_map> +# include <ext/hash_set> +# define __USE_STD_HASH +# else +# include <tr1/unordered_map> +# include <tr1/unordered_set> +# define __USE_STD_TR1 +# endif +#endif + +namespace janus { + +class VSHostMgr : + public janus::VTHandler +{ + private: + // typedef std::list<stem::gaddr_type> vshost_container_t; +#ifdef __USE_STLPORT_TR1 + typedef std::tr1::unordered_set<stem::gaddr_type> vshost_container_t; +#endif + + public: + typedef vshost_container_t::size_type size_type; + + VSHostMgr(); + VSHostMgr( stem::addr_type id, const char *info = 0 ); + VSHostMgr( const char *info ); + ~VSHostMgr(); + + // void handler( const stem::Event& ); + void VSNewMember( const stem::Event_base<VSsync_rq>& ); + void VSOutMember( const stem::Event_base<VSsync_rq>& ); + void VSsync_time( const stem::Event_base<VSsync>& ev ); + + void connect( const char *, int ); + void serve( int ); + + size_type vs_known_processes() const + { + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + size_type tmp = vshost.size(); + return tmp; + } + + void Subscribe( stem::addr_type, oid_type, group_type ); + + private: + typedef std::list<stem::NetTransportMgr *> nmgr_container_t; + typedef std::list<std::sockmgr_stream_MP<stem::NetTransport> *> srv_container_t; + + vshost_container_t vshost; + nmgr_container_t _clients; + srv_container_t _servers; + + // DECLARE_RESPONSE_TABLE( VSHostMgr, janus::VTHandler ); +}; + +} // namespace janus + +#endif // __vshostmgr_h Modified: trunk/complement/explore/include/janus/vtime.h =================================================================== --- trunk/complement/explore/include/janus/vtime.h 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/include/janus/vtime.h 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/17 21:54:39 ptr> +// -*- C++ -*- Time-stamp: <07/08/23 10:16:54 ptr> #ifndef __vtime_h #define __vtime_h @@ -332,87 +332,19 @@ } // namespace detail -class Janus : - public stem::EventHandler +struct vs_base { - public: - enum traceflags { - notrace = 0, - tracenet = 1, - tracedispatch = 2, - tracefault = 4, - tracedelayed = 8, - tracegroup = 0x10 - }; - - Janus() : - _trflags( notrace ), - _trs( 0 ) - { } - - Janus( const char *info ) : - stem::EventHandler( info ), - _trflags( notrace ), - _trs( 0 ) - { } - - Janus( stem::addr_type id ) : - stem::EventHandler( id ), - _trflags( notrace ), - _trs( 0 ) - { } - - Janus( stem::addr_type id, const char *info ) : - stem::EventHandler( id, info ), - _trflags( notrace ), - _trs( 0 ) - { } - - void JaDispatch( const stem::Event_base<VSmess>& ); - - void JaSend( const stem::Event& e, group_type ); - void Subscribe( stem::addr_type, oid_type, group_type ); - void Unsubscribe( oid_type, group_type ); - void Unsubscribe( oid_type ); - void get_gvtime( group_type, stem::addr_type, gvtime_type& ); - void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); - - void settrf( unsigned f ); - void unsettrf( unsigned f ); - void resettrf( unsigned f ); - void cleantrf(); - unsigned trflags() const; - void settrs( std::ostream * ); - - private: -#ifdef __USE_STLPORT_HASH - typedef std::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; - typedef std::hash_multimap<group_type, oid_type> gid_map_type; -#endif -#ifdef __USE_STD_HASH - typedef __gnu_cxx::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; - typedef __gnu_cxx::hash_multimap<group_type, oid_type> gid_map_type; -#endif -#if defined(__USE_STLPORT_TR1) || defined(__USE_STD_TR1) - typedef std::tr1::unordered_map<oid_type, detail::vtime_obj_rec> vt_map_type; - typedef std::tr1::unordered_multimap<group_type, oid_type> gid_map_type; -#endif - - void check_and_send( detail::vtime_obj_rec&, const stem::Event_base<VSmess>& ); - void check_and_send_delayed( detail::vtime_obj_rec& ); - - vt_map_type vtmap; - gid_map_type grmap; - - xmt::mutex _lock_tr; - unsigned _trflags; - std::ostream *_trs; - - DECLARE_RESPONSE_TABLE( Janus, stem::EventHandler ); + enum { + vshosts_group = 0, + first_user_group = 10 + }; }; +class Janus; + class VTHandler : - public stem::EventHandler + public stem::EventHandler, + public vs_base { private: class Init @@ -434,8 +366,8 @@ virtual ~VTHandler(); void JaSend( const stem::Event& e ); - void JoinGroup( group_type grp ) - { _vtdsp->Subscribe( self_id(), oid_type( self_id() ), grp ); } + void JoinGroup( group_type grp ); + virtual void VSNewMember( const stem::Event_base<VSsync_rq>& e ); virtual void VSOutMember( const stem::Event_base<VSsync_rq>& e ); virtual void VSsync_time( const stem::Event_base<VSsync>& ); @@ -451,19 +383,21 @@ protected: void VSNewMember_data( const stem::Event_base<VSsync_rq>&, const std::string& data ); - void get_gvtime( group_type g, gvtime_type& gvt ) - { _vtdsp->get_gvtime( g, self_id(), gvt ); } + void get_gvtime( group_type g, gvtime_type& gvt ); private: - static class Janus *_vtdsp; + static Janus *_vtdsp; + friend class Janus; DECLARE_RESPONSE_TABLE( VTHandler, stem::EventHandler ); }; -#define VS_MESS 0x300 -#define VS_NEW_MEMBER 0x301 -#define VS_OUT_MEMBER 0x302 -#define VS_SYNC_TIME 0x303 +#define VS_MESS 0x300 +#define VS_NEW_MEMBER 0x301 +#define VS_OUT_MEMBER 0x302 +#define VS_SYNC_TIME 0x303 +#define VS_NEW_REMOTE_MEMBER 0x304 +#define VS_NEW_MEMBER_RV 0x305 #ifdef __USE_STLPORT_HASH # undef __USE_STLPORT_HASH Modified: trunk/complement/explore/lib/janus/ChangeLog =================================================================== --- trunk/complement/explore/lib/janus/ChangeLog 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ChangeLog 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,3 +1,26 @@ +2007-08-23 Petr Ovtchenkov <pt...@is...> + + * janus.h, janus.cc: Janus code moved from vtime.h, vtime.cc; + + * vtime.h, vtime.cc: Janus code moved to janus.h, janus.cc; + + * ut/vt_object.cc, ut/vt_handler.cc, ut/VTmess_core.cc: idem; + + * ut/vt_dispatch.cc: idem; + + * Makefile.inc: idem; + + * libjanus: version 0.3.0; + + * vshostmgr.h, vshostmgr.cc: object for management virtual syncrony + processes; + + * vshostmgr.h, vshostmgr.cc, janus.h, janus.cc, vtime.h, vtime.cc: + procedure of entry into group with remote virtual synchrony objects; + + * ut/vt_remote.cc: test for interprocess virtual synchrony, + entry into group. + 2007-08-17 Petr Ovtchenkov <ye...@ya...> * vtime.h: use available variant of hash_map/hash_set or Modified: trunk/complement/explore/lib/janus/Makefile.inc =================================================================== --- trunk/complement/explore/lib/janus/Makefile.inc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/Makefile.inc 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,7 +1,7 @@ -# -*- makefile -*- Time-stamp: <06/10/10 15:22:33 ptr> +# -*- makefile -*- Time-stamp: <07/08/21 11:08:32 ptr> LIBNAME = janus MAJOR = 0 -MINOR = 2 +MINOR = 3 PATCH = 0 -SRC_CC = vtime.cc +SRC_CC = vtime.cc janus.cc vshostmgr.cc Added: trunk/complement/explore/lib/janus/janus.cc =================================================================== --- trunk/complement/explore/lib/janus/janus.cc (rev 0) +++ trunk/complement/explore/lib/janus/janus.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -0,0 +1,658 @@ +// -*- C++ -*- Time-stamp: <07/08/23 12:36:32 ptr> + +#include <janus/janus.h> +#include <janus/vshostmgr.h> +#include <stem/EvManager.h> + +// #include <iostream> + +namespace janus { + +using namespace std; +using namespace xmt; +using namespace stem; + +Janus::~Janus() +{ + delete _hostmgr; +} + +void Janus::settrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags |= f; +} + +void Janus::unsettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags &= (0xffffffff & ~f); +} + +void Janus::resettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags = f; +} + +void Janus::cleantrf() +{ + scoped_lock _x1( _lock_tr ); + _trflags = 0; +} + +unsigned Janus::trflags() const +{ + scoped_lock _x1( _lock_tr ); + + return _trflags; +} + +void Janus::settrs( std::ostream *s ) +{ + scoped_lock _x1( _lock_tr ); + _trs = s; +} + +void Janus::JaDispatch( const stem::Event_base<VSmess>& m ) +{ + pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = + grmap.equal_range( m.value().grp ); + + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i == vtmap.end() || i->second.stem_addr() == m.src() ) { // not for nobody and not for self + continue; + } + try { + // check local or remote? i->second.addr + // if remote, forward it to foreign VTDispatcher? + // looks, like local source shouldn't be here? + check_and_send( i->second, m ); + } + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + } + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + } + } +} + +void Janus::check_and_send( detail::vtime_obj_rec& vt, const stem::Event_base<VSmess>& m ) +{ + if ( vt.deliver( m.value() ) ) { + stem::Event ev( m.value().code ); + ev.dest(vt.stem_addr()); + ev.src(m.src()); + ev.value() = m.value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver " << m.value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( ev ); + check_and_send_delayed( vt ); + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedelayed) ) { + *_trs << "Delayed " << m.value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + vt.dpool.push_back( make_pair( xmt::timespec(xmt::timespec::now), new Event_base<VSmess>(m) ) ); // 0 should be timestamp + } +} + +void Janus::check_and_send_delayed( detail::vtime_obj_rec& vt ) +{ + typedef detail::vtime_obj_rec::dpool_t dpool_t; + bool more; + do { + more = false; + for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { + if ( vt.deliver_delayed( j->second->value() ) ) { + stem::Event evd( j->second->value().code ); + evd.dest(vt.stem_addr()); + evd.src(j->second->src()); + evd.value() = j->second->value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver delayed " << j->second->value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( evd ); + delete j->second; + vt.dpool.erase( j++ ); + more = true; + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Remain delayed " << j->second->value() + << "\nReason: "; + vt.trace_deliver( j->second->value(), *_trs ) << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + ++j; + } + } + } while ( more ); +} + +void Janus::JaSend( const stem::Event& e, group_type grp ) +{ + // This method not called from Dispatch, but work on the same level and in the same + // scope, so this lock (from stem::EventHandler) required here: + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + const pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = + grmap.equal_range( grp ); + + for ( gid_map_type::const_iterator o = range.first; o != range.second; ++o ) { + vt_map_type::iterator i = vtmap.find( o->second ); + if ( i != vtmap.end() && i->second.stem_addr() == e.src() ) { // for self + detail::vtime_obj_rec& vt = i->second; + const oid_type& from = o->second; + stem::Event_base<VSmess> m( VS_MESS ); + m.value().src = from; // oid + m.value().code = e.code(); + m.value().mess = e.value(); + m.value().grp = grp; + // m.dest( ??? ); // local VT dispatcher? + m.src( e.src() ); + + // This is like VTDispatch, but VT stamp in every message different, + // in accordance with individual knowlage about object's VT. + + vt.next( from, grp ); // my counter + + for ( gid_map_type::const_iterator g = range.first; g != range.second; ++g ) { + vt_map_type::iterator k = vtmap.find( g->second ); + if ( k == vtmap.end() || k->second.stem_addr() == m.src() ) { // not for nobody and not for self + continue; + } + try { + vt.delta( m.value().gvt, from, g->second, grp ); + + // check local or remote? i->second.addr + // if remote, forward it to foreign VTDispatcher? + try { + /* const transport tr = */ manager()->transport( k->second.stem_addr() ); + gaddr_type ga = manager()->reflect( k->second.stem_addr() ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + addr_type a = manager()->reflect( ga ); + if ( a == badaddr ) { + a = manager()->SubscribeRemote( ga, "janus" ); + } + m.dest( a ); + Forward( m ); + vt.base_advance(g->second); // store last send VT to obj + } + } + catch ( const range_error& ) { + check_and_send( k->second, m ); + vt.base_advance(g->second); // store last send VT to obj + } + } + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + } + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + } + } + + return; + } + } + + throw domain_error( "VT object not member of group" ); // Error: not group member +} + +void Janus::Subscribe( stem::addr_type addr, oid_type oid, group_type grp ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = + grmap.equal_range( grp ); + + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() ) { + stem::Event_base<VSsync_rq> ev( VS_NEW_MEMBER ); + ev.dest( i->second.stem_addr() ); + ev.src( addr ); + ev.value().grp = grp; + Forward( ev ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_NEW_MEMBER G" << grp << " " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } + } + + vtmap[oid].add( addr, grp ); + grmap.insert( make_pair(grp,oid) ); + + // cerr << "**** " << grp << " " << xmt::getpid() << endl; + + if ( /* (grp != vshosts_group) && */ (_hostmgr != 0) ) { + _hostmgr->Subscribe( addr, oid, grp ); + } +} + +void Janus::Unsubscribe( oid_type oid, group_type grp ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + + vt_map_type::iterator i = vtmap.find( oid ); + while ( range.first != range.second ) { + if ( range.first->second == oid ) { + grmap.erase( range.first++ ); + } else { + vt_map_type::iterator j = vtmap.find( range.first->second ); + if ( j != vtmap.end() ) { + stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); + ev.dest( j->second.stem_addr() ); + ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); + ev.value().grp = grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_OUT_MEMBER " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + Forward( ev ); + } + ++range.first; + } + } + + if ( i != vtmap.end() ) { + if ( i->second.rm_group( grp ) ) { // no groups more + vtmap.erase( i ); + } + } + + if ( grp != vshosts_group ) { + } +} + +void Janus::Unsubscribe( oid_type oid ) +{ + // See comment on top of JaSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + vt_map_type::iterator i = vtmap.find( oid ); + if ( i != vtmap.end() ) { + list<group_type> grp_list; + i->second.groups_list( back_inserter( grp_list ) ); + for ( list<group_type>::const_iterator grp = grp_list.begin(); grp != grp_list.end(); ++grp ) { + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( *grp ); + + while ( range.first != range.second ) { + if ( range.first->second == oid ) { + grmap.erase( range.first++ ); + } else { + vt_map_type::iterator j = vtmap.find( range.first->second ); + if ( j != vtmap.end() ) { + stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); + ev.dest( j->second.stem_addr() ); + ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); + ev.value().grp = *grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_OUT_MEMBER " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + Forward( ev ); + } + ++range.first; + } + } + i->second.rm_group( *grp ); + } + vtmap.erase( i ); + } + + // if ( grp != vshosts_group ) { + // } +} + +void Janus::get_gvtime( group_type grp, stem::addr_type addr, gvtime_type& gvt ) +{ + // See comment on top of JaSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { + i->second.get_gvt( gvt ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "virtual synchrony object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "virtual synchrony object not member of group" ); // Error: not group member +} + +void Janus::set_gvtime( group_type grp, stem::addr_type addr, const gvtime_type& gvt ) +{ + // See comment on top of JaSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "Set gvt G" << grp << " " << i->first + << " (" << addr << ") " << gvt << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + i->second.sync( grp, i->first, gvt ); + check_and_send_delayed( i->second ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "virtual synchrony object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "virtual synchrony object not member of group" ); // Error: not group member +} + +void Janus::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) +{ + if ( ev.value().grp == vshosts_group ) { + ev.dest( _hostmgr->self_id() ); + Forward( ev ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "<-> VS_NEW_MEMBER G" << vshosts_group << " " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + gaddr_type ga = manager()->reflect( ev.src() ); + addr_type janus_addr = badaddr; + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + janus_addr = manager()->reflect( ga ); + if ( janus_addr == badaddr ) { + janus_addr = manager()->SubscribeRemote( ga, "janus" ); + } + } + stem::Event_base<VSsync_rq> evr( VS_NEW_MEMBER_RV ); + evr.dest( janus_addr ); + evr.value().grp = vshosts_group; + Send( evr ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_NEW_MEMBER_RV G" << vshosts_group << " " + << hex << showbase + << evr.src() << " -> " << evr.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + } +} + +void Janus::VSNewRemoteMemberDirect( const stem::Event_base<VSsync_rq>& ev ) +{ + if ( ev.value().grp != vshosts_group ) { + group_type grp = ev.value().grp; + pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = grmap.equal_range( grp ); + if ( range.first != range.second ) { // we have local? member within this group + gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr + addr_type addr = ev.src(); + gaddr_type ga = manager()->reflect( addr ); + addr_type janus_addr = badaddr; + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + janus_addr = manager()->reflect( ga ); + if ( janus_addr == badaddr ) { + janus_addr = manager()->SubscribeRemote( ga, "janus" ); + } + } + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() ) { + stem::Event_base<VSsync_rq> evs( VS_NEW_MEMBER ); + evs.dest( i->second.stem_addr() ); + evs.src( addr ); + evs.value().grp = grp; + Forward( evs ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_NEW_MEMBER (remote) G" << grp << " " + << hex << showbase + << evs.src() << " -> " << evs.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + stem::Event_base<VSsync_rq> evr( VS_NEW_MEMBER_RV ); + evr.dest( janus_addr ); + evr.src( i->second.stem_addr() ); + evr.value().grp = grp; + Forward( evr ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_NEW_MEMBER_RV G" << grp << " " + << hex << showbase + << evr.src() << " -> " << evr.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } + } + + vtmap[oid].add( addr, grp ); + grmap.insert( make_pair(grp,oid) ); + // cerr << "**** " << grp << " " << xmt::getpid() << endl; + } + } +} + +void Janus::VSNewRemoteMemberRevert( const stem::Event_base<VSsync_rq>& ev ) +{ + if ( ev.value().grp != vshosts_group ) { + group_type grp = ev.value().grp; + pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = grmap.equal_range( grp ); + if ( range.first != range.second ) { // we have local? member within this group + gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr + addr_type addr = ev.src(); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && ((i->second.stem_addr() & stem::extbit) == 0 )) { + stem::Event_base<VSsync_rq> evs( VS_NEW_MEMBER ); + evs.dest( i->second.stem_addr() ); + evs.src( addr ); + evs.value().grp = grp; + Forward( evs ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << " -> VS_NEW_MEMBER (remote revert) G" << grp << " " + << hex << showbase + << evs.src() << " -> " << evs.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } + } + + vtmap[oid].add( addr, grp ); + grmap.insert( make_pair(grp,oid) ); + // cerr << "**** " << grp << " " << xmt::getpid() << endl; + } + } else { + gaddr_type oid = manager()->reflect( ev.src() ); // ???? oid == gaddr + addr_type addr = ev.src(); + + vtmap[oid].add( addr, vshosts_group ); + grmap.insert( make_pair(static_cast<group_type>(vshosts_group),oid) ); + // cerr << "**** " << vshosts_group << " " << xmt::getpid() << endl; + } +} + +void Janus::connect( const char *host, int port ) +{ + _hostmgr->connect( host, port ); +} + +void Janus::serve( int port ) +{ + _hostmgr->serve( port ); +} + +size_t Janus::vs_known_processes() const +{ + return _hostmgr->vs_known_processes(); +} + +Janus::difference_type Janus::group_size( group_type grp ) const +{ + // See comment on top of JaSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = grmap.equal_range( grp ); + + return distance( range.first, range.second ); +} + +DEFINE_RESPONSE_TABLE( Janus ) + EV_Event_base_T_( ST_NULL, VS_MESS, JaDispatch, VSmess ) + EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER, VSNewMember, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_NEW_REMOTE_MEMBER, VSNewRemoteMemberDirect, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER_RV, VSNewRemoteMemberRevert, VSsync_rq ) +END_RESPONSE_TABLE + +} // namespace janus Modified: trunk/complement/explore/lib/janus/ut/Makefile.inc =================================================================== --- trunk/complement/explore/lib/janus/ut/Makefile.inc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/Makefile.inc 2007-08-23 09:02:15 UTC (rev 1702) @@ -3,6 +3,8 @@ PRGNAME = ut_vtime SRC_CC = ../vtime.cc \ + ../janus.cc \ + ../vshostmgr.cc \ unit_test.cc \ vt_operations.cc \ VTmess_core.cc \ Modified: trunk/complement/explore/lib/janus/ut/VTmess_core.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/VTmess_core.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/VTmess_core.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -112,7 +112,7 @@ ev.value().gvt[0][t1] = 2; ev.value().gvt[1][t0] = 3; ev.value().gvt[1][t1] = 4; - ev.value().grp = 7; + ev.value().grp = janus::vs_base::first_user_group + 7; ev.value().mess = "data"; h.Send( ev ); @@ -125,7 +125,7 @@ EXAM_CHECK( h.gvt[0][t1] == 2 ); EXAM_CHECK( h.gvt[1][t0] == 3 ); EXAM_CHECK( h.gvt[1][t1] == 4 ); - EXAM_CHECK( h.grp == 7 ); + EXAM_CHECK( h.grp == (janus::vs_base::first_user_group + 7) ); EXAM_CHECK( h.mess == "data" ); ev.value().code = 3; @@ -141,7 +141,7 @@ EXAM_CHECK( h.gvt[0][t1] == 2 ); EXAM_CHECK( h.gvt[1][t0] == 3 ); EXAM_CHECK( h.gvt[1][t1] == 4 ); - EXAM_CHECK( h.grp == 7 ); + EXAM_CHECK( h.grp == (janus::vs_base::first_user_group + 7) ); EXAM_CHECK( h.mess == "more data" ); return EXAM_RESULT; Modified: trunk/complement/explore/lib/janus/ut/vt_dispatch.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_dispatch.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/vt_dispatch.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,9 +1,11 @@ -// -*- C++ -*- Time-stamp: <07/08/17 10:39:45 ptr> +// -*- C++ -*- Time-stamp: <07/08/21 11:09:27 ptr> #include "vt_operations.h" #include <iostream> + #include <janus/vtime.h> +#include <janus/janus.h> using namespace janus; using namespace std; @@ -79,15 +81,15 @@ const oid_type t1(1); const oid_type t2(2); - dsp.Subscribe( dummy1.self_id(), t1, 0 ); - dsp.Subscribe( dummy2.self_id(), t2, 0 ); + dsp.Subscribe( dummy1.self_id(), t1, janus::vs_base::first_user_group ); + dsp.Subscribe( dummy2.self_id(), t2, janus::vs_base::first_user_group ); stem::Event ev( VT_MESS2 ); ev.src( dummy1.self_id() ); ev.value() = "hello"; - dsp.JaSend( ev, 0 ); + dsp.JaSend( ev, janus::vs_base::first_user_group ); dummy2.wait(); @@ -107,16 +109,16 @@ const oid_type t2(2); const oid_type t3(3); - dsp.Subscribe( dummy1.self_id(), t1, 0 ); - dsp.Subscribe( dummy2.self_id(), t2, 0 ); - dsp.Subscribe( dummy3.self_id(), t3, 0 ); + dsp.Subscribe( dummy1.self_id(), t1, janus::vs_base::first_user_group ); + dsp.Subscribe( dummy2.self_id(), t2, janus::vs_base::first_user_group ); + dsp.Subscribe( dummy3.self_id(), t3, janus::vs_base::first_user_group ); stem::Event ev( VT_MESS2 ); ev.src( dummy1.self_id() ); ev.value() = "hello"; - dsp.JaSend( ev, 0 ); + dsp.JaSend( ev, janus::vs_base::first_user_group ); dummy2.wait(); dummy3.wait(); Modified: trunk/complement/explore/lib/janus/ut/vt_handler.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_handler.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/vt_handler.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -54,7 +54,7 @@ cnd.set( false ); gr.set( false ); - JoinGroup( 0 ); + JoinGroup( first_user_group ); } VTDummy::VTDummy( stem::addr_type id ) : @@ -65,7 +65,7 @@ cnd.set( false ); gr.set( false ); - JoinGroup( 0 ); + JoinGroup( first_user_group ); } VTDummy::VTDummy( stem::addr_type id, const char *info ) : @@ -76,7 +76,7 @@ cnd.set( false ); gr.set( false ); - JoinGroup( 0 ); + JoinGroup( first_user_group ); } VTDummy::~VTDummy() @@ -133,7 +133,7 @@ VTDummy dummy2; stem::Event ev( VS_DUMMY_MESS ); - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group ev.value() = "hello"; dummy1.JaSend( ev ); @@ -153,7 +153,7 @@ VTDummy dummy3; stem::Event ev( VS_DUMMY_MESS ); - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group ev.value() = "hello"; dummy1.JaSend( ev ); @@ -168,7 +168,7 @@ EXAM_CHECK( dummy1.count == 2 ); EXAM_CHECK( dummy1.msg == "" ); - ev.dest( 100 ); // not this group member + ev.dest( janus::vs_base::first_user_group + 100 ); // not this group member try { dummy1.JaSend( ev ); EXAM_ERROR( "exception expected" ); @@ -185,7 +185,7 @@ VTDummy dummy2; stem::Event ev( VS_DUMMY_MESS ); - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group ev.value() = "hello"; dummy1.JaSend( ev ); @@ -226,7 +226,7 @@ VTDummy dummy1; stem::Event ev( VS_DUMMY_MESS ); - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group ev.value() = "hello"; { @@ -255,7 +255,7 @@ VTDummy dummy2; stem::Event ev( VS_DUMMY_MESS ); - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group ev.value() = "hello"; dummy1.JaSend( ev ); @@ -277,7 +277,7 @@ dummy3.wait_greeting(); ev.value() = "hi"; - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group dummy1.JaSend( ev ); dummy2.wait(); @@ -303,7 +303,7 @@ dummy2.wait_greeting(); ev.value() = "hello"; - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group dummy1.JaSend( ev ); dummy2.wait(); @@ -317,7 +317,7 @@ dummy3.wait_greeting(); ev.value() = "hi"; - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group dummy1.JaSend( ev ); dummy3.wait(); @@ -334,7 +334,7 @@ dummy3.wait_greeting(); ev.value() = "more"; - ev.dest( 0 ); // group + ev.dest( janus::vs_base::first_user_group ); // group dummy1.JaSend( ev ); dummy2.wait(); Modified: trunk/complement/explore/lib/janus/ut/vt_object.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_object.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/vt_object.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -12,9 +12,9 @@ { detail::vtime_obj_rec ob; - const group_type gr0 = 0; - const group_type gr1 = 1; - const group_type gr2 = 2; + const group_type gr0 = janus::vs_base::first_user_group + 0; + const group_type gr1 = janus::vs_base::first_user_group + 1; + const group_type gr2 = janus::vs_base::first_user_group + 2; const oid_type obj0(0); const oid_type obj1(1); const oid_type obj2(2); Modified: trunk/complement/explore/lib/janus/ut/vt_remote.cc =================================================================== --- trunk/complement/explore/lib/janus/ut/vt_remote.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/ut/vt_remote.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,30 +1,41 @@ -// -*- C++ -*- Time-stamp: <07/08/17 10:07:52 ptr> +// -*- C++ -*- Time-stamp: <07/08/23 12:43:15 ptr> #include "vt_operations.h" #include <iostream> #include <janus/vtime.h> +#include <janus/janus.h> +#include <janus/vshostmgr.h> #include <stem/EvManager.h> #include <stem/NetTransport.h> +#include <stem/Event.h> +#include <stem/EvPack.h> #include <sockios/sockmgr.h> #include <sys/wait.h> #include <mt/xmt.h> #include <mt/shm.h> +#include <mt/time.h> +#include <list> +#include <sstream> + using namespace std; using namespace stem; using namespace xmt; using namespace janus; +#define VS_DUMMY_MESS 0x1203 +#define VS_DUMMY_GREETING 0x1204 + class YaRemote : public janus::VTHandler { public: YaRemote(); - YaRemote( stem::addr_type id ); - YaRemote( stem::addr_type id, const char *info ); + YaRemote( stem::addr_type id, const char *info = 0 ); + YaRemote( const char *info ); ~YaRemote(); void handler( const stem::Event& ); @@ -51,9 +62,6 @@ DECLARE_RESPONSE_TABLE( YaRemote, janus::VTHandler ); }; -#define VS_DUMMY_MESS 0x1203 -#define VS_DUMMY_GREETING 0x1204 - YaRemote::YaRemote() : VTHandler(), count(0), @@ -61,30 +69,24 @@ { cnd.set( false ); gr.set( false ); - - JoinGroup( 0 ); } -YaRemote::YaRemote( stem::addr_type id ) : - VTHandler( id ), +YaRemote::YaRemote( stem::addr_type id, const char *info ) : + VTHandler( id, info ), count(0), ocount(0) { cnd.set( false ); gr.set( false ); - - JoinGroup( 0 ); } -YaRemote::YaRemote( stem::addr_type id, const char *info ) : - VTHandler( id, info ), +YaRemote::YaRemote( const char *info ) : + VTHandler( info ), count(0), ocount(0) { cnd.set( false ); gr.set( false ); - - JoinGroup( 0 ); } YaRemote::~YaRemote() @@ -101,7 +103,7 @@ void YaRemote::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { - // cerr << "Hello " << ev.src() << endl; + cerr << "Hello " << xmt::getpid() << endl; ++count; // VTNewMember_data( ev, "" ); @@ -127,7 +129,9 @@ void YaRemote::greeting() { - gr.set( true ); + if ( count > 0 ) { + gr.set( true ); + } } DEFINE_RESPONSE_TABLE( YaRemote ) @@ -137,6 +141,8 @@ int EXAM_IMPL(vtime_operations::remote) { + cerr << "============\n"; + const char fname[] = "/tmp/yanus_test.shm"; xmt::shm_alloc<0> seg; xmt::allocator_shm<xmt::__condition<true>,0> shm_cnd; @@ -151,30 +157,73 @@ b.wait(); - NetTransportMgr mgr; + { + YaRemote obj1( "obj client" ); - addr_type zero = mgr.open( "localhost", 6980 ); + // obj1.manager()->settrf( stem::EvManager::tracenet | stem::EvManager::tracedispatch ); + // obj1.manager()->settrs( &std::cerr ); - EXAM_CHECK_ASYNC( mgr.good() ); + obj1.vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); + obj1.vtdispatcher()->settrs( &std::cerr ); - YaRemote obj2; + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 1 ); + obj1.vtdispatcher()->connect( "localhost", 6980 ); + + cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; + +#if 1 + while ( obj1.vtdispatcher()->vs_known_processes() < 2 ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 1000000 ) ); + } +#else + while ( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) < 2 ) { + xmt::Thread::yield(); + xmt::delay( xmt::timespec( 0, 1000000 ) ); + } +#endif + + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) == 2 ); + cerr << obj1.vtdispatcher()->group_size(janus::vs_base::vshosts_group) << endl; + + obj1.JoinGroup( janus::vs_base::first_user_group ); + + obj1.wait_greeting(); + + EXAM_CHECK_ASYNC( obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) == 2 ); + + // cerr << "* " << obj1.vtdispatcher()->group_size(janus::vs_base::first_user_group) << endl; + } + exit(0); } catch ( xmt::fork_in_parent& child ) { - sockmgr_stream_MP<NetTransport> srv( 6980 ); + YaRemote obj1( "obj srv" ); - EXAM_REQUIRE( srv.good() ); + // obj1.vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); + // obj1.vtdispatcher()->settrs( &std::cerr ); + obj1.vtdispatcher()->serve( 6980 ); + + obj1.JoinGroup( janus::vs_base::first_user_group ); + b.wait(); - YaRemote obj1; + // while ( obj1.vtdispatcher()->vs_known_processes() < 2 ) { + // xmt::delay( xmt::timespec( 0, 1000000 ) ); + // } + // stem::Event ev( VS_DUMMY_MESS ); + // ev.dest( janus::vs_base::first_user_group ); // group + // ev.value() = "hello"; + + // obj1.JaSend( ev ); + + // obj1.wait_greeting(); + int stat; EXAM_CHECK( waitpid( child.pid(), &stat, 0 ) == child.pid() ); - - srv.close(); - srv.wait(); } (&b)->~__barrier<true>(); Added: trunk/complement/explore/lib/janus/vshostmgr.cc =================================================================== --- trunk/complement/explore/lib/janus/vshostmgr.cc (rev 0) +++ trunk/complement/explore/lib/janus/vshostmgr.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -0,0 +1,222 @@ +// -*- C++ -*- Time-stamp: <07/08/23 11:24:18 ptr> + +#include <janus/vshostmgr.h> + +#include <iostream> +#include <janus/vtime.h> +#include <janus/janus.h> + +#include <stem/EvManager.h> +#include <stem/NetTransport.h> +#include <stem/Event.h> +#include <stem/EvPack.h> +#include <sockios/sockmgr.h> + +#include <mt/xmt.h> + +#include <list> +#include <sstream> + +namespace janus { + +using namespace std; +using namespace stem; +using namespace xmt; +using namespace janus; + +VSHostMgr::VSHostMgr() : + VTHandler() +{ + vshost.insert( gaddr_type( stem::janus_addr ) ); + JoinGroup( vshosts_group ); +} + +VSHostMgr::VSHostMgr( stem::addr_type id, const char *info ) : + VTHandler( id, info ) +{ + vshost.insert( gaddr_type( stem::janus_addr ) ); + JoinGroup( vshosts_group ); +} + +VSHostMgr::VSHostMgr( const char *info ) : + VTHandler( info ) +{ + // vtdispatcher()->settrf( janus::Janus::tracenet | janus::Janus::tracedispatch | janus::Janus::tracefault | janus::Janus::tracedelayed | janus::Janus::tracegroup ); + + vshost.insert( gaddr_type( stem::janus_addr ) ); + JoinGroup( vshosts_group ); +} + +VSHostMgr::~VSHostMgr() +{ + while ( !_clients.empty() ) { + _clients.front()->close(); + _clients.front()->join(); + delete _clients.front(); + _clients.pop_front(); + } + while ( !_servers.empty() ) { + _servers.front()->close(); + _servers.front()->wait(); + delete _servers.front(); + _servers.pop_front(); + } +} + +void VSHostMgr::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) +{ + // pack vshost, + stringstream s; + + gaddr_type ga = manager()->reflect( ev.src() ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + } + + vshost.insert( ga ); // address of remote Janus + + stem::__pack_base::__net_pack( s, static_cast<uint32_t>(vshost.size()) ); + for ( vshost_container_t::const_iterator i = vshost.begin(); i != vshost.end(); ++i ) { + i->net_pack( s ); + } + VTHandler::VSNewMember_data( ev, s.str() ); + // VTHandler::VSNewMember( ev ); +} + +void VSHostMgr::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) +{ + // remove host from vshost + gaddr_type ga = manager()->reflect( ev.src() ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + } + + vshost_container_t::iterator i = vshost.find( ga ); + if ( i != vshost.end() ) { + vshost.erase( i ); + } +#ifdef __FIT_VS_TRACE + VTHandler::VSOutMember( ev ); +#endif +} + +void VSHostMgr::VSsync_time( const stem::Event_base<VSsync>& ev ) +{ + // extract ev.value().mess, sync with vshost + stringstream s( ev.value().mess ); + + uint32_t sz; + gaddr_type ga; + + stem::__pack_base::__net_unpack( s, sz ); + while ( sz-- > 0 ) { + ga.net_unpack( s ); + // vshost.push_back( ga ); + vshost.insert( ga ); + } + + VTHandler::VSsync_time(ev); +} + +void VSHostMgr::connect( const char *host, int port ) +{ + _clients.push_back( new NetTransportMgr() ); + + addr_type zero = _clients.back()->open( host, port ); + + gaddr_type ga = manager()->reflect( zero ); + if ( ga != gaddr_type() ) { + ga.addr = stem::janus_addr; + addr_type a = manager()->reflect( ga ); + if ( a == badaddr ) { + a = manager()->SubscribeRemote( ga, "janus" ); + } + stem::Event_base<VSsync_rq> ev( VS_NEW_MEMBER ); + ev.dest( a ); + ev.value().grp = vshosts_group; // special group + Send( ev ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(vtdispatcher()->_lock_tr); + if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracenet) ) { + *vtdispatcher()->_trs << "connect " << host << ":" << port << "\n" + << " -> VS_NEW_MEMBER G" << vshosts_group << " " + << hex << showbase + << self_id() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } +#ifdef __FIT_VS_TRACE + else { + try { + scoped_lock lk(vtdispatcher()->_lock_tr); + if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracenet) ) { + *vtdispatcher()->_trs << "connect " << host << ":" << port << " fail" << endl; + } + } + catch ( ... ) { + } + } +#endif // __FIT_VS_TRACE +} + +void VSHostMgr::serve( int port ) +{ + _servers.push_back( new sockmgr_stream_MP<NetTransport>( port ) ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(vtdispatcher()->_lock_tr); + if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracenet) ) { + *vtdispatcher()->_trs << "serve " << port + << (_servers.back()->good() ? " ok" : " fail" ) + << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE +} + +void VSHostMgr::Subscribe( stem::addr_type addr, oid_type oid, group_type grp ) +{ + try { + manager()->transport( addr ); + } + catch ( const range_error& ) { + // only for local object + stem::Event_base<VSsync_rq> ev( VS_NEW_REMOTE_MEMBER ); + ev.src( addr ); + gaddr_type ga = manager()->reflect( stem::janus_addr ); + + for ( vshost_container_t::const_iterator i = vshost.begin(); i != vshost.end(); ++i ) { + if ( ga != *i ) { + ev.dest( manager()->reflect( *i ) ); + ev.value().grp = grp; + Forward( ev ); +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(vtdispatcher()->_lock_tr); + if ( vtdispatcher()->_trs != 0 && vtdispatcher()->_trs->good() && (vtdispatcher()->_trflags & Janus::tracenet) ) { + *vtdispatcher()->_trs << " -> VS_NEW_REMOTE_MEMBER G" << grp << " " + << hex << showbase + << ev.src() << " -> " << ev.dest() << dec << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + } + } + } +} + +// DEFINE_RESPONSE_TABLE( VSHostMgr ) +// EV_Event_base_T_( ST_NULL, VS_NEW_REMOTE_MEMBER, VSNewRemoteMemberDirect, VSsync_rq ) +// EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER_RV, VSNewRemoteMemberRevert, VSsync_rq ) +// END_RESPONSE_TABLE + + +} // namespace janus Modified: trunk/complement/explore/lib/janus/vtime.cc =================================================================== --- trunk/complement/explore/lib/janus/vtime.cc 2007-08-23 08:49:29 UTC (rev 1701) +++ trunk/complement/explore/lib/janus/vtime.cc 2007-08-23 09:02:15 UTC (rev 1702) @@ -1,10 +1,10 @@ -// -*- C++ -*- Time-stamp: <07/08/17 22:28:55 ptr> +// -*- C++ -*- Time-stamp: <07/08/23 08:47:46 ptr> #include <janus/vtime.h> +#include <janus/janus.h> +#include <janus/vshostmgr.h> -#include <iostream> #include <stdint.h> -#include <stem/EvManager.h> namespace janus { @@ -543,442 +543,7 @@ } // namespace detail -void Janus::settrf( unsigned f ) -{ - scoped_lock _x1( _lock_tr ); - _trflags |= f; -} -void Janus::unsettrf( unsigned f ) -{ - scoped_lock _x1( _lock_tr ); - _trflags &= (0xffffffff & ~f); -} - -void Janus::resettrf( unsigned f ) -{ - scoped_lock _x1( _lock_tr ); - _trflags = f; -} - -void Janus::cleantrf() -{ - scoped_lock _x1( _lock_tr ); - _trflags = 0; -} - -unsigned Janus::trflags() const -{ - scoped_lock _x1( _lock_tr ); - - return _trflags; -} - -void Janus::settrs( std::ostream *s ) -{ - scoped_lock _x1( _lock_tr ); - _trs = s; -} - -void Janus::JaDispatch( const stem::Event_base<VSmess>& m ) -{ - pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = - grmap.equal_range( m.value().grp ); - - for ( ; range.first != range.second; ++range.first ) { - vt_map_type::iterator i = vtmap.find( range.first->second ); - if ( i == vtmap.end() || i->second.stem_addr() == m.src() ) { // not for nobody and not for self - continue; - } - try { - // check local or remote? i->second.addr - // if remote, forward it to foreign VTDispatcher? - // looks, like local source shouldn't be here? - check_and_send( i->second, m ); - } - catch ( const out_of_range& err ) { - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { - *_trs << err.what() << " " - << __FILE__ << ":" << __LINE__ << endl; - } - } - catch ( ... ) { - } - } - catch ( const domain_error& err ) { - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { - *_trs << err.what() << " " - << __FILE__ << ":" << __LINE__ << endl; - } - } - catch ( ... ) { - } - } - } -} - -void Janus::check_and_send( detail::vtime_obj_rec& vt, const stem::Event_base<VSmess>& m ) -{ - if ( vt.deliver( m.value() ) ) { - stem::Event ev( m.value().code ); - ev.dest(vt.stem_addr()); - ev.src(m.src()); - ev.value() = m.value().mess; -#ifdef __FIT_VS_TRACE - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { - *_trs << "Deliver " << m.value() << endl; - } - } - catch ( ... ) { - } -#endif // __FIT_VS_TRACE - Forward( ev ); - check_and_send_delayed( vt ); - } else { -#ifdef __FIT_VS_TRACE - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracedelayed) ) { - *_trs << "Delayed " << m.value() << endl; - } - } - catch ( ... ) { - } -#endif // __FIT_VS_TRACE - vt.dpool.push_back( make_pair( xmt::timespec(xmt::timespec::now), new Event_base<VSmess>(m) ) ); // 0 should be timestamp - } -} - -void Janus::check_and_send_delayed( detail::vtime_obj_rec& vt ) -{ - typedef detail::vtime_obj_rec::dpool_t dpool_t; - bool more; - do { - more = false; - for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { - if ( vt.deliver_delayed( j->second->value() ) ) { - stem::Event evd( j->second->value().code ); - evd.dest(vt.stem_addr()); - evd.src(j->second->src()); - evd.value() = j->second->value().mess; -#ifdef __FIT_VS_TRACE - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { - *_trs << "Deliver delayed " << j->second->value() << endl; - } - } - catch ( ... ) { - } -#endif // __FIT_VS_TRACE - Forward( evd ); - delete j->second; - vt.dpool.erase( j++ ); - more = true; - } else { -#ifdef __FIT_VS_TRACE - try { - scoped_lock lk(_lock_tr); - if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { - *_trs << "Remain delayed " << j->second->value() - << "\nReason: "; - vt.trace_deliver( j->second->value(), *_trs ) << endl; - } - } - catch ( ... ) { - } -#endif // __FIT_VS_TRACE - ++j; - } - } - } while ( more ); -} - -void Janus::JaSend( const stem::Event& e, group_type grp ) -{ - // This method not called from Dispatch, but work on the same level and in the same - // scope, so this lock (from stem::EventHandler) required here: - xmt::recursive_scoped_lock lk( this->_theHistory_lock ); - - const pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = - grmap.equal_range( grp ); - - for ( gid_map_type::const_iterator o = range.first; o != range.second; ++o ) { - vt_map_type::iterator i = vtmap.find( o->second ); - if ( i != vtmap.end() && i->second.stem_addr() == e.src() ) { // for self - detail::vtime_obj_rec& vt = i->second; - const oid_type& from = o->second; - stem::Event_base<VSmess> m( VS_MESS ); - m.value().src = from; // oid - m.value().code = e.code(); - m.value().mess = e.value(); - m.value().grp = grp; - // m.dest( ??? ); // local VT dispatcher? - m.src( e.src() ); - - // This is like VTDispatch, but VT stamp in every message different, - // in accordance with individual knowlage about object's VT. - - vt.next( from, grp ); // my counter - - for ( gid_map_type::const_iterator g = range.first; g != range.second; ++g ) { - vt_map_type::iterator k = vtmap.find( g->second ); - if ( k == vtmap.end() || k->second.stem_addr() == m.src() ) { // not for nobody and not for self - continue; - } - try { - vt.delta( m.value().gvt, from, g->second, grp ); - - ... [truncated message content] |