Thread: [complement-svn] SF.net SVN: complement: [1678] trunk/complement/explore/test/virtual_time (Page 2)
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2007-08-13 07:02:39
|
Revision: 1678 http://complement.svn.sourceforge.net/complement/?rev=1678&view=rev Author: complement Date: 2007-08-13 00:02:36 -0700 (Mon, 13 Aug 2007) Log Message: ----------- entrance into group looks fine Modified Paths: -------------- trunk/complement/explore/test/virtual_time/test/Makefile trunk/complement/explore/test/virtual_time/test/unit_test.cc trunk/complement/explore/test/virtual_time/test/vt_handler.cc trunk/complement/explore/test/virtual_time/test/vt_operations.cc trunk/complement/explore/test/virtual_time/test/vt_operations.h trunk/complement/explore/test/virtual_time/vtime.cc trunk/complement/explore/test/virtual_time/vtime.h Modified: trunk/complement/explore/test/virtual_time/test/Makefile =================================================================== --- trunk/complement/explore/test/virtual_time/test/Makefile 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/Makefile 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -# -*- Makefile -*- Time-stamp: <07/02/21 15:30:59 ptr> +# -*- Makefile -*- Time-stamp: <07/08/08 22:18:48 ptr> SRCROOT := ../../.. COMPILER_NAME := gcc @@ -16,8 +16,15 @@ release-shared: PROJECT_LIBS = -lxmt -lsockios -lstem -lexam dbg-shared: PROJECT_LIBS = -lxmtg -lsockiosg -lstemg -lexamg +ifndef WITHOUT_STLPORT stldbg-shared: PROJECT_LIBS = -lxmtstlg -lsockiosstlg -lstemstlg -lexamstlg +endif +dbg-shared: DEFS += -D__FIT_VS_TRACE +ifndef WITHOUT_STLPORT +stldbg-shared: DEFS += -D__FIT_VS_TRACE +endif + LDLIBS = ${PROJECT_LIBS} check: all-shared Modified: trunk/complement/explore/test/virtual_time/test/unit_test.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:41:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:19:11 ptr> #include "vt_operations.h" @@ -20,12 +20,15 @@ t.add( &vtime_operations::VTMess_core, vt_oper, "VTmess core transfer", tc[2] = t.add( &vtime_operations::gvt_add, vt_oper, "Group VT add", tc[1] ) ); - t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", - t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", - t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] ) ) ) ) ) ); + t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", + t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", + t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", + t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", + t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", + t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] ) ) ) ) ) ) ) ) ); return t.girdle(); } Modified: trunk/complement/explore/test/virtual_time/test/vt_handler.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:53:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 23:21:59 ptr> #include "vt_operations.h" @@ -7,6 +7,8 @@ #include <iostream> #include <vtime.h> +#include <stem/EvManager.h> + using namespace vt; using namespace std; @@ -20,21 +22,31 @@ ~VTDummy(); void handler( const stem::Event& ); - void VTNewMember( const stem::Event& ); - void VTOutMember( const stem::Event& ); + void VSNewMember( const stem::Event_base<VSsync_rq>& ); + void VSOutMember( const stem::Event_base<VSsync_rq>& ); + void greeting(); + void wait(); std::string msg; int count; int ocount; + void wait_greeting() + { + gr.try_wait(); + gr.set( false ); + } + private: xmt::condition cnd; + xmt::condition gr; DECLARE_RESPONSE_TABLE( VTDummy, vt::VTHandler ); }; -#define VT_MESS3 0x1203 +#define VS_DUMMY_MESS 0x1203 +#define VS_DUMMY_GREETING 0x1204 VTDummy::VTDummy() : VTHandler(), @@ -42,6 +54,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::VTDummy( stem::addr_type id ) : @@ -50,6 +63,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::VTDummy( stem::addr_type id, const char *info ) : @@ -58,6 +72,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::~VTDummy() @@ -72,13 +87,20 @@ cnd.set( true ); } -void VTDummy::VTNewMember( const stem::Event& ev ) +void VTDummy::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { - // cerr << "Hello" << endl; + // cerr << "Hello " << ev.src() << endl; ++count; + + // VTNewMember_data( ev, "" ); + VTHandler::VSNewMember( ev ); + + stem::EventVoid gr_ev( VS_DUMMY_GREETING ); + gr_ev.dest( ev.src() ); + Send( gr_ev ); } -void VTDummy::VTOutMember( const stem::Event& ev ) +void VTDummy::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) { // cerr << "Hello" << endl; ++ocount; @@ -91,8 +113,14 @@ cnd.set( false ); } +void VTDummy::greeting() +{ + gr.set( true ); +} + DEFINE_RESPONSE_TABLE( VTDummy ) - EV_EDS( ST_NULL, VT_MESS3, handler ) + EV_EDS( ST_NULL, VS_DUMMY_MESS, handler ) + EV_VOID( ST_NULL, VS_DUMMY_GREETING, greeting ) END_RESPONSE_TABLE int EXAM_IMPL(vtime_operations::VTHandler1) @@ -100,7 +128,7 @@ VTDummy dummy1; VTDummy dummy2; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -120,7 +148,7 @@ VTDummy dummy2; VTDummy dummy3; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -152,7 +180,7 @@ VTDummy dummy1; VTDummy dummy2; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -171,7 +199,7 @@ // dummy3.wait(); // EXAM_CHECK( dummy3.msg == "hi" ); - EXAM_CHECK( dummy3.msg == "" ); // dummy3 don't see, due to VTS + // EXAM_CHECK( dummy3.msg == "" ); // dummy3 don't see, due to VTS EXAM_CHECK( dummy2.msg == "hi" ); EXAM_CHECK( dummy1.msg == "" ); } @@ -189,3 +217,135 @@ return EXAM_RESULT; } +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup) +{ + VTDummy dummy1; + + stem::Event ev( VS_DUMMY_MESS ); + ev.dest( 0 ); // group + ev.value() = "hello"; + + { + // dummy1.manager()->settrf( /* stem::EvManager::tracenet | */ stem::EvManager::tracedispatch ); + // dummy1.manager()->settrs( &std::cerr ); + + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + dummy1.VTSend( ev ); + + dummy3.wait(); + + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + return EXAM_RESULT; +} + +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup2) +{ + VTDummy dummy1; + VTDummy dummy2; + + stem::Event ev( VS_DUMMY_MESS ); + ev.dest( 0 ); // group + ev.value() = "hello"; + + dummy1.VTSend( ev ); + + dummy2.wait(); + EXAM_CHECK( dummy2.msg == "hello" ); + + { + // cerr << (void *)&dummy1 << " dummy1\n" + // << (void *)&dummy2 << " dummy2\n"; + // dummy1.manager()->settrf( /* stem::EvManager::tracenet | */ stem::EvManager::tracedispatch | stem::EvManager::tracefault ); + // dummy1.manager()->settrs( &std::cerr ); + + // dummy1.vtdispatcher()->settrf( VTDispatcher::tracedispatch | VTDispatcher::tracefault | VTDispatcher::tracedelayed | VTDispatcher::tracegroup ); + // dummy1.vtdispatcher()->settrs( &std::cerr ); + + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + dummy3.wait(); + + EXAM_CHECK( dummy2.msg == "hi" ); + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + return EXAM_RESULT; +} + +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup3) +{ + VTDummy dummy1; + + stem::Event ev( VS_DUMMY_MESS ); + + { + VTDummy dummy2; + + dummy2.wait_greeting(); + + ev.value() = "hello"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + EXAM_CHECK( dummy2.msg == "hello" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + { + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy3.wait(); + + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + { + VTDummy dummy2; + VTDummy dummy3; + + dummy2.wait_greeting(); + dummy3.wait_greeting(); + + ev.value() = "more"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + dummy3.wait(); + + EXAM_CHECK( dummy2.msg == "more" ); + EXAM_CHECK( dummy3.msg == "more" ); + EXAM_CHECK( dummy1.msg == "" ); + + dummy2.VTSend( ev ); + + dummy1.wait(); + } + + EXAM_CHECK( dummy1.msg == "more" ); + + return EXAM_RESULT; +} Modified: trunk/complement/explore/test/virtual_time/test/vt_operations.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_operations.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_operations.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -52,6 +52,8 @@ EXAM_CHECK( !(vt1 <= vt2) ); EXAM_CHECK( !(vt2 <= vt1) ); + + return EXAM_RESULT; } int EXAM_IMPL(vtime_operations::vt_add) Modified: trunk/complement/explore/test/virtual_time/test/vt_operations.h =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:40:39 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:18:35 ptr> #ifndef __vt_operations_h #define __vt_operations_h @@ -25,6 +25,9 @@ int EXAM_DECL(VTHandler2); int EXAM_DECL(VTSubscription); + int EXAM_DECL(VTEntryIntoGroup); + int EXAM_DECL(VTEntryIntoGroup2); + int EXAM_DECL(VTEntryIntoGroup3); }; #endif // __vt_operations_h Modified: trunk/complement/explore/test/virtual_time/vtime.cc =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/27 09:49:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:10:59 ptr> #include "vtime.h" @@ -103,40 +103,80 @@ } } +void VSsync_rq::pack( std::ostream& s ) const +{ + __pack( s, grp ); + __pack( s, mess ); +} + +void VSsync_rq::net_pack( std::ostream& s ) const +{ + __net_pack( s, grp ); + __net_pack( s, mess ); +} + +void VSsync_rq::unpack( std::istream& s ) +{ + __unpack( s, grp ); + __unpack( s, mess ); +} + +void VSsync_rq::net_unpack( std::istream& s ) +{ + __net_unpack( s, grp ); + __net_unpack( s, mess ); +} + +void VSsync::pack( std::ostream& s ) const +{ + gvt.pack( s ); + VSsync_rq::pack( s ); +} + +void VSsync::net_pack( std::ostream& s ) const +{ + gvt.net_pack( s ); + VSsync_rq::net_pack( s ); +} + +void VSsync::unpack( std::istream& s ) +{ + gvt.unpack( s ); + VSsync_rq::unpack( s ); +} + +void VSsync::net_unpack( std::istream& s ) +{ + gvt.net_unpack( s ); + VSsync_rq::net_unpack( s ); +} + void VTmess::pack( std::ostream& s ) const { __pack( s, code ); src.pack( s ); // __pack( s, src ); - gvt.pack( s ); - __pack( s, grp ); - __pack( s, mess ); + VSsync::pack( s ); } void VTmess::net_pack( std::ostream& s ) const { __net_pack( s, code ); src.net_pack( s ); // __net_pack( s, src ); - gvt.net_pack( s ); - __net_pack( s, grp ); - __net_pack( s, mess ); + VSsync::net_pack( s ); } void VTmess::unpack( std::istream& s ) { __unpack( s, code ); src.unpack( s ); // __unpack( s, src ); - gvt.unpack( s ); - __unpack( s, grp ); - __unpack( s, mess ); + VSsync::unpack( s ); } void VTmess::net_unpack( std::istream& s ) { __net_unpack( s, code ); src.net_unpack( s ); // __net_unpack( s, src ); - gvt.net_unpack( s ); - __net_unpack( s, grp ); - __net_unpack( s, mess ); + VSsync::net_unpack( s ); } bool operator <=( const vtime_type& l, const vtime_type& r ) @@ -314,7 +354,6 @@ lvt[m.src] += m.gvt.gvt; lvt[m.src][m.grp][m.src] = vt.gvt[m.grp][m.src] + 1; sup( vt.gvt[m.grp], lvt[m.src][m.grp] ); - // cout << vt.gvt << endl; return true; } @@ -327,7 +366,6 @@ lvt[m.src] += m.gvt.gvt; lvt[m.src][m.grp][m.src] = vt.gvt[m.grp][m.src] + 1; sup( vt.gvt[m.grp], lvt[m.src][m.grp] ); - // cout << vt.gvt << endl; return true; } @@ -343,10 +381,6 @@ gvtime gvt( m.gvt ); if ( (vt.gvt[m.grp][m.src] + 1) != gvt[m.grp][m.src] ) { - // cerr << "1" << endl; - // cerr << vt.gvt[m.grp][m.src] << "\n" - // << gvt[m.grp][m.src] - // << endl; if ( (vt.gvt[m.grp][m.src] + 1) > gvt[m.grp][m.src] ) { throw out_of_range( "duplicate or wrong VT message" ); } @@ -357,16 +391,12 @@ xvt[m.src] = 0; // force exclude message originator, it checked above if ( !(xvt <= vt[m.grp]) ) { - // cerr << "2" << endl; - // cerr << xvt << "\n\n" << vt[m.grp] << endl; return false; } // check side casuality (via groups other then message's group) for ( groups_container_type::const_iterator l = groups.begin(); l != groups.end(); ++l ) { if ( (*l != m.grp) && !((lvt[m.src][*l] + gvt[*l]) <= vt[*l]) ) { - // cerr << "3" << endl; - // cerr << "group " << *l << xvt << "\n\n" << vt[*l] << endl; return false; } } @@ -374,6 +404,38 @@ return true; } +ostream& vtime_obj_rec::trace_deliver( const VTmess& m, ostream& o ) +{ + if ( groups.find( m.grp ) == groups.end() ) { + return o << "VT object not member of group"; + } + + gvtime gvt( m.gvt ); + + if ( (vt.gvt[m.grp][m.src] + 1) != gvt[m.grp][m.src] ) { + if ( (vt.gvt[m.grp][m.src] + 1) > gvt[m.grp][m.src] ) { + return o << "duplicate or wrong VT message, " << vt.gvt << " vs " << gvt; + } + return o << "counter violation, " << vt.gvt << " vs " << gvt; + } + + vtime xvt = lvt[m.src][m.grp] + gvt[m.grp]; + xvt[m.src] = 0; // force exclude message originator, it checked above + + if ( !(xvt <= vt[m.grp]) ) { + return o << "casuality violation, " << xvt << " vs " << vt[m.grp]; + } + + // check side casuality (via groups other then message's group) + for ( groups_container_type::const_iterator l = groups.begin(); l != groups.end(); ++l ) { + if ( (*l != m.grp) && !((lvt[m.src][*l] + gvt[*l]) <= vt[*l]) ) { + return o << "side casuality violation, " << (lvt[m.src][*l] + gvt[*l]) << " vs " << vt[*l]; + } + } + + return o << "should be delivered"; +} + bool vtime_obj_rec::order_correct_delayed( const VTmess& m ) { gvtime gvt( m.gvt ); @@ -399,7 +461,7 @@ return true; } -void vtime_obj_rec::delta( gvtime& vtstamp, oid_type from, oid_type to, group_type grp ) +void vtime_obj_rec::delta( gvtime& vtstamp, const oid_type& from, const oid_type& to, group_type grp ) { vtstamp.gvt = vt.gvt - svt[to]; // delta vtstamp[grp][from] = vt.gvt[grp][from]; // my counter, as is, not delta @@ -449,7 +511,7 @@ return groups.empty() ? true : false; } -void vtime_obj_rec::rm_member( oid_type oid ) +void vtime_obj_rec::rm_member( const oid_type& oid ) { delta_vtime_type::iterator i = lvt.find( oid ); @@ -464,8 +526,59 @@ } } +void vtime_obj_rec::sync( group_type g, const oid_type& oid, const gvtime_type& gvt ) +{ + lvt[oid] = gvt; + gvtime_type::const_iterator i = gvt.find( g ); + if ( i != gvt.end() ) { + sup( vt.gvt[g], i->second.vt ); + // vtime_type::const_iterator j = i->second.vt.find( oid ); + // if ( j != i->second.vt.end() ) { + // vt.gvt[g][oid] = j->second; + // cerr << "**** " << gvt << endl; + // } + } +} + } // namespace detail +void VTDispatcher::settrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags |= f; +} + +void VTDispatcher::unsettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags &= (0xffffffff & ~f); +} + +void VTDispatcher::resettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags = f; +} + +void VTDispatcher::cleantrf() +{ + scoped_lock _x1( _lock_tr ); + _trflags = 0; +} + +unsigned VTDispatcher::trflags() const +{ + scoped_lock _x1( _lock_tr ); + + return _trflags; +} + +void VTDispatcher::settrs( std::ostream *s ) +{ + scoped_lock _x1( _lock_tr ); + _trs = s; +} + void VTDispatcher::VTDispatch( const stem::Event_base<VTmess>& m ) { pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = @@ -482,48 +595,110 @@ // looks, like local source shouldn't be here? check_and_send( i->second, m ); } - catch ( const out_of_range& ) { + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } - catch ( const domain_error& ) { + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } } } void VTDispatcher::check_and_send( detail::vtime_obj_rec& vt, const stem::Event_base<VTmess>& m ) { - typedef detail::vtime_obj_rec::dpool_t dpool_t; - - // detail::vtime_obj_rec& vt = i->second; - if ( vt.deliver( m.value() ) ) { stem::Event ev( m.value().code ); ev.dest(vt.stem_addr()); ev.src(m.src()); ev.value() = m.value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver " << m.value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE Forward( ev ); - bool more; - do { - more = false; - for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { - if ( vt.deliver_delayed( j->second->value() ) ) { - stem::Event evd( j->second->value().code ); - evd.dest(vt.stem_addr()); - ev.src(m.src()); - evd.value() = j->second->value().mess; - Forward( evd ); - delete j->second; - vt.dpool.erase( j++ ); - more = true; - } else { - ++j; - } + check_and_send_delayed( vt ); + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedelayed) ) { + *_trs << "Delayed " << m.value() << endl; } - } while ( more ); - } else { + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE vt.dpool.push_back( make_pair( 0, new Event_base<VTmess>(m) ) ); // 0 should be timestamp } } +void VTDispatcher::check_and_send_delayed( detail::vtime_obj_rec& vt ) +{ + typedef detail::vtime_obj_rec::dpool_t dpool_t; + bool more; + do { + more = false; + for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { + if ( vt.deliver_delayed( j->second->value() ) ) { + stem::Event evd( j->second->value().code ); + evd.dest(vt.stem_addr()); + evd.src(j->second->src()); + evd.value() = j->second->value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver delayed " << j->second->value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( evd ); + delete j->second; + vt.dpool.erase( j++ ); + more = true; + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Remain delayed " << j->second->value() + << "\nReason: "; + vt.trace_deliver( j->second->value(), *_trs ) << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + ++j; + } + } + } while ( more ); +} + void VTDispatcher::VTSend( const stem::Event& e, group_type grp ) { // This method not called from Dispatch, but work on the same level and in the same @@ -538,7 +713,7 @@ if ( i != vtmap.end() && i->second.stem_addr() == e.src() ) { // for self detail::vtime_obj_rec& vt = i->second; const oid_type& from = o->second; - stem::Event_base<VTmess> m( MESS ); + stem::Event_base<VTmess> m( VS_MESS ); m.value().src = from; // oid m.value().code = e.code(); m.value().mess = e.value(); @@ -565,9 +740,27 @@ vt.base_advance(g->second); // store last send VT to obj } - catch ( const out_of_range& ) { + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } - catch ( const domain_error& ) { + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } } @@ -589,9 +782,20 @@ for ( ; range.first != range.second; ++range.first ) { vt_map_type::iterator i = vtmap.find( range.first->second ); if ( i != vtmap.end() ) { - stem::Event ev( VTS_NEW_MEMBER ); + stem::Event_base<VSsync_rq> ev( VS_NEW_MEMBER ); ev.dest( i->second.stem_addr() ); ev.src( addr ); + ev.value().grp = grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "VS_NEW_MEMBER " << ev.src() << " -> " << ev.dest() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE Forward( ev ); } } @@ -615,9 +819,21 @@ } else { vt_map_type::iterator j = vtmap.find( range.first->second ); if ( j != vtmap.end() ) { - stem::Event ev( VTS_OUT_MEMBER ); + stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); ev.dest( j->second.stem_addr() ); ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); + ev.value().grp = grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "VS_OUT_MEMBER " << ev.src() << " -> " << ev.dest() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( ev ); } ++range.first; @@ -631,8 +847,74 @@ } } +void VTDispatcher::get_gvtime( group_type grp, stem::addr_type addr, gvtime_type& gvt ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { + i->second.get_gvt( gvt ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "VT object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "VT object not member of group" ); // Error: not group member +} + +void VTDispatcher::set_gvtime( group_type grp, stem::addr_type addr, const gvtime_type& gvt ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "Set gvt G" << grp << " " << i->first + << " (" << addr << ") " << gvt << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + i->second.sync( grp, i->first, gvt ); + check_and_send_delayed( i->second ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "VT object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "VT object not member of group" ); // Error: not group member +} + DEFINE_RESPONSE_TABLE( VTDispatcher ) - EV_Event_base_T_( ST_NULL, MESS, VTDispatch, VTmess ) + EV_Event_base_T_( ST_NULL, VS_MESS, VTDispatch, VTmess ) END_RESPONSE_TABLE char *Init_buf[128]; @@ -723,17 +1005,45 @@ ((Init *)Init_buf)->~Init(); } -void VTHandler::VTNewMember( const stem::Event& ) +void VTHandler::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { + stem::Event_base<VSsync> out_ev( VS_SYNC_TIME ); + out_ev.dest( ev.src() ); + out_ev.value().grp = ev.value().grp; + get_gvtime( ev.value().grp, out_ev.value().gvt.gvt ); + + Send( out_ev ); } -void VTHandler::VTOutMember( const stem::Event& ) +void VTHandler::VSNewMember_data( const stem::Event_base<VSsync_rq>& ev, const string& data ) { + stem::Event_base<VSsync> out_ev( VS_SYNC_TIME ); + out_ev.dest( ev.src() ); + out_ev.value().grp = ev.value().grp; + get_gvtime( ev.value().grp, out_ev.value().gvt.gvt ); + out_ev.value().mess = data; + + Send( out_ev ); } +void VTHandler::VSOutMember( const stem::Event_base<VSsync_rq>& ) +{ +} + +void VTHandler::VSsync_time( const stem::Event_base<VSsync>& ev ) +{ + try { + // sync data from ev.value().mess + _vtdsp->set_gvtime( ev.value().grp, self_id(), ev.value().gvt.gvt ); + } + catch ( const domain_error& ) { + } +} + DEFINE_RESPONSE_TABLE( VTHandler ) - EV_EDS( ST_NULL, VTS_NEW_MEMBER, VTNewMember ) - EV_EDS( ST_NULL, VTS_OUT_MEMBER, VTOutMember ) + EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER, VSNewMember, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_OUT_MEMBER, VSOutMember, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_SYNC_TIME, VSsync_time, VSsync ) END_RESPONSE_TABLE } // namespace vt @@ -742,16 +1052,18 @@ ostream& operator <<( ostream& o, const vt::vtime_type::value_type& v ) { - return o << "(" << v.first << "," << v.second << ")\n"; + return o << "(" << v.first << "," << v.second << ")"; } ostream& operator <<( ostream& o, const vt::vtime_type& v ) { - o << "[\n"; for ( vt::vtime_type::const_iterator i = v.begin(); i != v.end(); ++i ) { - o << " " << *i; + if ( i != v.begin() ) { + o << ", "; + } + o << *i; } - return o << " ]\n"; + return o; } ostream& operator <<( ostream& o, const vt::vtime& v ) @@ -759,17 +1071,38 @@ ostream& operator <<( ostream& o, const vt::gvtime_type::value_type& v ) { - o << "group " << v.first << ": " << v.second.vt; + o << v.first << ": " << v.second.vt; } ostream& operator <<( ostream& o, const vt::gvtime_type& v ) { o << "{\n"; for ( vt::gvtime_type::const_iterator i = v.begin(); i != v.end(); ++i ) { - o << " " << *i; + o << "\t" << *i << "\n"; } return o << "}\n"; } +ostream& operator <<( ostream& o, const vt::gvtime& v ) +{ + return o << v.gvt; +} + +ostream& operator <<( ostream& o, const vt::VSsync& m ) +{ + // ios_base::fmtflags f = o.flags( ios_base::hex ); + o << "G" << m.grp << " " << m.gvt; + + return o; +} + +ostream& operator <<( ostream& o, const vt::VTmess& m ) +{ + ios_base::fmtflags f = o.flags( ios_base::hex | ios_base::showbase ); + o << "C" << m.code << dec << " " << m.src << " " << static_cast<const vt::VSsync&>(m); + o.flags( f ); + return o; +} + } // namespace std Modified: trunk/complement/explore/test/virtual_time/vtime.h =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.h 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/vtime.h 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/27 09:53:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:05:47 ptr> #ifndef __vtime_h #define __vtime_h @@ -147,7 +147,7 @@ mutable gvtime_type gvt; }; -struct VTmess : +struct VSsync_rq : public stem::__pack_base { void pack( std::ostream& s ) const; @@ -155,28 +155,59 @@ void unpack( std::istream& s ); void net_unpack( std::istream& s ); - VTmess() : - code(0), - src(), - gvt(), + VSsync_rq() : grp(0), mess() { } - VTmess( const VTmess& _gvt ) : - code( _gvt.code ), - src( _gvt.src ), - gvt( _gvt.gvt ), + VSsync_rq( const VSsync_rq& _gvt ) : grp( _gvt.grp ), mess( _gvt.mess ) { } - stem::code_type code; - oid_type src; - gvtime gvt; group_type grp; std::string mess; }; +struct VSsync : + public VSsync_rq +{ + void pack( std::ostream& s ) const; + void net_pack( std::ostream& s ) const; + void unpack( std::istream& s ); + void net_unpack( std::istream& s ); + + VSsync() + { } + VSsync( const VSsync& _gvt ) : + VSsync_rq( _gvt ), + gvt( _gvt.gvt ) + { } + + gvtime gvt; +}; + +struct VTmess : + public VSsync +{ + void pack( std::ostream& s ) const; + void net_pack( std::ostream& s ) const; + void unpack( std::istream& s ); + void net_unpack( std::istream& s ); + + VTmess() : + code(0), + src() + { } + VTmess( const VTmess& _gvt ) : + VSsync( _gvt ), + code( _gvt.code ), + src( _gvt.src ) + { } + + stem::code_type code; + oid_type src; +}; + namespace detail { class vtime_obj_rec @@ -187,18 +218,22 @@ void add( stem::addr_type a, group_type g ) { addr = a; groups.insert(g); } bool rm_group( group_type ); - void rm_member( oid_type ); + void rm_member( const oid_type& ); stem::addr_type stem_addr() const { return addr; } bool deliver( const VTmess& ev ); bool deliver_delayed( const VTmess& ev ); - void next( oid_type from, group_type grp ) + std::ostream& trace_deliver( const VTmess& m, std::ostream& o ); + void next( const oid_type& from, group_type grp ) { ++vt.gvt[grp][from]; /* increment my VT counter */ } - void delta( gvtime& vtstamp, oid_type from, oid_type to, group_type grp ); - void base_advance( oid_type to ) + void delta( gvtime& vtstamp, const oid_type& from, const oid_type& to, group_type grp ); + void base_advance( const oid_type& to ) { svt[to] = vt.gvt; /* store last sent VT to obj */ } + void get_gvt( gvtime_type& gvt ) const + { gvt = vt.gvt; } + void sync( group_type, const oid_type&, const gvtime_type& ); #ifdef __FIT_EXAM const gvtime_type::data_type& operator[]( const gvtime_type::key_type k ) const @@ -235,19 +270,36 @@ public stem::EventHandler { public: - VTDispatcher() + enum traceflags { + notrace = 0, + tracenet = 1, + tracedispatch = 2, + tracefault = 4, + tracedelayed = 8, + tracegroup = 0x10 + }; + + VTDispatcher() : + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( const char *info ) : - stem::EventHandler( info ) + stem::EventHandler( info ), + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( stem::addr_type id ) : - stem::EventHandler( id ) + stem::EventHandler( id ), + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( stem::addr_type id, const char *info ) : - stem::EventHandler( id, info ) + stem::EventHandler( id, info ), + _trflags( notrace ), + _trs( 0 ) { } void VTDispatch( const stem::Event_base<VTmess>& ); @@ -255,16 +307,30 @@ void VTSend( const stem::Event& e, group_type ); void Subscribe( stem::addr_type, oid_type, group_type ); void Unsubscribe( oid_type, group_type ); + void get_gvtime( group_type, stem::addr_type, gvtime_type& ); + void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); + void settrf( unsigned f ); + void unsettrf( unsigned f ); + void resettrf( unsigned f ); + void cleantrf(); + unsigned trflags() const; + void settrs( std::ostream * ); + private: typedef std::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; typedef std::hash_multimap<group_type, oid_type> gid_map_type; void check_and_send( detail::vtime_obj_rec&, const stem::Event_base<VTmess>& ); + void check_and_send_delayed( detail::vtime_obj_rec& ); vt_map_type vtmap; gid_map_type grmap; + xmt::mutex _lock_tr; + unsigned _trflags; + std::ostream *_trs; + DECLARE_RESPONSE_TABLE( VTDispatcher, stem::EventHandler ); }; @@ -291,23 +357,34 @@ virtual ~VTHandler(); void VTSend( const stem::Event& e ); - virtual void VTNewMember( const stem::Event& e ); - virtual void VTOutMember( const stem::Event& e ); + virtual void VSNewMember( const stem::Event_base<VSsync_rq>& e ); + virtual void VSOutMember( const stem::Event_base<VSsync_rq>& e ); + virtual void VSsync_time( const stem::Event_base<VSsync>& ); + template <class D> void VTSend( const stem::Event_base<D>& e ) { VTHandler::VTSend( stem::Event_convert<D>()( e ) ); } + static VTDispatcher *vtdispatcher() + { return _vtdsp; } + protected: + void VSNewMember_data( const stem::Event_base<VSsync_rq>&, const std::string& data ); + + void get_gvtime( group_type g, gvtime_type& gvt ) + { _vtdsp->get_gvtime( g, self_id(), gvt ); } + private: static class VTDispatcher *_vtdsp; DECLARE_RESPONSE_TABLE( VTHandler, stem::EventHandler ); }; -#define MESS 0x300 -#define VTS_NEW_MEMBER 0x301 -#define VTS_OUT_MEMBER 0x302 +#define VS_MESS 0x300 +#define VS_NEW_MEMBER 0x301 +#define VS_OUT_MEMBER 0x302 +#define VS_SYNC_TIME 0x303 } // namespace vt @@ -318,6 +395,9 @@ ostream& operator <<( ostream&, const vt::vtime& ); ostream& operator <<( ostream&, const vt::gvtime_type::value_type& ); ostream& operator <<( ostream&, const vt::gvtime_type& ); +ostream& operator <<( ostream&, const vt::gvtime& ); +ostream& operator <<( ostream& o, const vt::VSsync& ); +ostream& operator <<( ostream& o, const vt::VTmess& ); } // namespace std This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <com...@us...> - 2007-08-13 15:52:07
|
Revision: 1679 http://complement.svn.sourceforge.net/complement/?rev=1679&view=rev Author: complement Date: 2007-08-13 08:52:01 -0700 (Mon, 13 Aug 2007) Log Message: ----------- more real group number pass Modified Paths: -------------- trunk/complement/explore/test/virtual_time/test/vt_handler.cc trunk/complement/explore/test/virtual_time/vtime.cc trunk/complement/explore/test/virtual_time/vtime.h Modified: trunk/complement/explore/test/virtual_time/test/vt_handler.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-13 07:02:36 UTC (rev 1678) +++ trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-13 15:52:01 UTC (rev 1679) @@ -55,6 +55,8 @@ { cnd.set( false ); gr.set( false ); + + JoinGroup( 0 ); } VTDummy::VTDummy( stem::addr_type id ) : @@ -64,6 +66,8 @@ { cnd.set( false ); gr.set( false ); + + JoinGroup( 0 ); } VTDummy::VTDummy( stem::addr_type id, const char *info ) : @@ -73,6 +77,8 @@ { cnd.set( false ); gr.set( false ); + + JoinGroup( 0 ); } VTDummy::~VTDummy() Modified: trunk/complement/explore/test/virtual_time/vtime.cc =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-13 07:02:36 UTC (rev 1678) +++ trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-13 15:52:01 UTC (rev 1679) @@ -847,6 +847,52 @@ } } +void VTDispatcher::Unsubscribe( oid_type oid ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + vt_map_type::iterator i = vtmap.find( oid ); + if ( i != vtmap.end() ) { + list<group_type> grp_list; + i->second.groups_list( back_inserter( grp_list ) ); + for ( list<group_type>::const_iterator grp = grp_list.begin(); grp != grp_list.end(); ++grp ) { + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( *grp ); + + while ( range.first != range.second ) { + if ( range.first->second == oid ) { + grmap.erase( range.first++ ); + } else { + vt_map_type::iterator j = vtmap.find( range.first->second ); + if ( j != vtmap.end() ) { + stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); + ev.dest( j->second.stem_addr() ); + ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); + ev.value().grp = *grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "VS_OUT_MEMBER " << ev.src() << " -> " << ev.dest() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + + Forward( ev ); + } + ++range.first; + } + } + i->second.rm_group( *grp ); + } + vtmap.erase( i ); + } +} + void VTDispatcher::get_gvtime( group_type grp, stem::addr_type addr, gvtime_type& gvt ) { // See comment on top of VTSend above @@ -979,7 +1025,7 @@ { new( Init_buf ) Init(); - _vtdsp->Subscribe( self_id(), oid_type( self_id() ), /* grp */ 0 ); + // _vtdsp->Subscribe( self_id(), oid_type( self_id() ), /* grp */ 0 ); } VTHandler::VTHandler( const char *info ) : @@ -987,7 +1033,7 @@ { new( Init_buf ) Init(); - _vtdsp->Subscribe( self_id(), oid_type( self_id() ), /* grp */ 0 ); + // _vtdsp->Subscribe( self_id(), oid_type( self_id() ), /* grp */ 0 ); } VTHandler::VTHandler( stem::addr_type id, const char *info ) : @@ -995,12 +1041,12 @@ { new( Init_buf ) Init(); - _vtdsp->Subscribe( id, oid_type( id ), /* grp */ 0 ); + // _vtdsp->Subscribe( id, oid_type( id ), /* grp */ 0 ); } VTHandler::~VTHandler() { - _vtdsp->Unsubscribe( oid_type( self_id() ), /* grp */ 0 ); + _vtdsp->Unsubscribe( oid_type( self_id() ) ); ((Init *)Init_buf)->~Init(); } Modified: trunk/complement/explore/test/virtual_time/vtime.h =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.h 2007-08-13 07:02:36 UTC (rev 1678) +++ trunk/complement/explore/test/virtual_time/vtime.h 2007-08-13 15:52:01 UTC (rev 1679) @@ -220,6 +220,10 @@ bool rm_group( group_type ); void rm_member( const oid_type& ); + template <typename BackInsertIterator> + void groups_list( BackInsertIterator bi ) const + { std::copy( groups.begin(), groups.end(), bi ); } + stem::addr_type stem_addr() const { return addr; } @@ -307,6 +311,7 @@ void VTSend( const stem::Event& e, group_type ); void Subscribe( stem::addr_type, oid_type, group_type ); void Unsubscribe( oid_type, group_type ); + void Unsubscribe( oid_type ); void get_gvtime( group_type, stem::addr_type, gvtime_type& ); void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); @@ -357,6 +362,8 @@ virtual ~VTHandler(); void VTSend( const stem::Event& e ); + void JoinGroup( group_type grp ) + { _vtdsp->Subscribe( self_id(), oid_type( self_id() ), grp ); } virtual void VSNewMember( const stem::Event_base<VSsync_rq>& e ); virtual void VSOutMember( const stem::Event_base<VSsync_rq>& e ); virtual void VSsync_time( const stem::Event_base<VSsync>& ); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <com...@us...> - 2007-08-16 18:44:30
|
Revision: 1680 http://complement.svn.sourceforge.net/complement/?rev=1680&view=rev Author: complement Date: 2007-08-16 11:44:28 -0700 (Thu, 16 Aug 2007) Log Message: ----------- road to interprocess virtual synchrony Modified Paths: -------------- trunk/complement/explore/test/virtual_time/test/Makefile.inc trunk/complement/explore/test/virtual_time/test/unit_test.cc trunk/complement/explore/test/virtual_time/test/vt_operations.h trunk/complement/explore/test/virtual_time/vtime.cc trunk/complement/explore/test/virtual_time/vtime.h Added Paths: ----------- trunk/complement/explore/test/virtual_time/test/vt_remote.cc Modified: trunk/complement/explore/test/virtual_time/test/Makefile.inc =================================================================== --- trunk/complement/explore/test/virtual_time/test/Makefile.inc 2007-08-13 15:52:01 UTC (rev 1679) +++ trunk/complement/explore/test/virtual_time/test/Makefile.inc 2007-08-16 18:44:28 UTC (rev 1680) @@ -8,4 +8,5 @@ VTmess_core.cc \ vt_object.cc \ vt_dispatch.cc \ - vt_handler.cc + vt_handler.cc \ + vt_remote Modified: trunk/complement/explore/test/virtual_time/test/unit_test.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-13 15:52:01 UTC (rev 1679) +++ trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-16 18:44:28 UTC (rev 1680) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/11 01:19:11 ptr> +// -*- C++ -*- Time-stamp: <07/08/16 09:04:40 ptr> #include "vt_operations.h" @@ -20,15 +20,16 @@ t.add( &vtime_operations::VTMess_core, vt_oper, "VTmess core transfer", tc[2] = t.add( &vtime_operations::gvt_add, vt_oper, "Group VT add", tc[1] ) ); - t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", - t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", - t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", - t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", - t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", - t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] ) ) ) ) ) ) ) ) ); + t.add( &vtime_operations::remote, vt_oper, "remote", + t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", + t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", + t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", + t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", + t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", + t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] )))))))))); return t.girdle(); } Modified: trunk/complement/explore/test/virtual_time/test/vt_operations.h =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-13 15:52:01 UTC (rev 1679) +++ trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-16 18:44:28 UTC (rev 1680) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/08/11 01:18:35 ptr> +// -*- C++ -*- Time-stamp: <07/08/16 09:02:08 ptr> #ifndef __vt_operations_h #define __vt_operations_h @@ -28,6 +28,8 @@ int EXAM_DECL(VTEntryIntoGroup); int EXAM_DECL(VTEntryIntoGroup2); int EXAM_DECL(VTEntryIntoGroup3); + + int EXAM_DECL(remote); }; #endif // __vt_operations_h Added: trunk/complement/explore/test/virtual_time/test/vt_remote.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_remote.cc (rev 0) +++ trunk/complement/explore/test/virtual_time/test/vt_remote.cc 2007-08-16 18:44:28 UTC (rev 1680) @@ -0,0 +1,194 @@ +// -*- C++ -*- Time-stamp: <07/08/16 10:45:48 ptr> + +#include "vt_operations.h" + +// #include <boost/lexical_cast.hpp> + +#include <iostream> +#include <vtime.h> + +#include <stem/EvManager.h> +#include <stem/NetTransport.h> +#include <sockios/sockmgr.h> +#include <sys/wait.h> + +#include <mt/xmt.h> +#include <mt/shm.h> + +using namespace std; +using namespace stem; +using namespace xmt; +using namespace vt; + +class YaRemote : + public vt::VTHandler +{ + public: + YaRemote(); + YaRemote( stem::addr_type id ); + YaRemote( stem::addr_type id, const char *info ); + ~YaRemote(); + + void handler( const stem::Event& ); + void VSNewMember( const stem::Event_base<VSsync_rq>& ); + void VSOutMember( const stem::Event_base<VSsync_rq>& ); + + void greeting(); + + void wait(); + std::string msg; + int count; + int ocount; + + void wait_greeting() + { + gr.try_wait(); + gr.set( false ); + } + + private: + xmt::condition cnd; + xmt::condition gr; + + DECLARE_RESPONSE_TABLE( YaRemote, vt::VTHandler ); +}; + +#define VS_DUMMY_MESS 0x1203 +#define VS_DUMMY_GREETING 0x1204 + +YaRemote::YaRemote() : + VTHandler(), + count(0), + ocount(0) +{ + cnd.set( false ); + gr.set( false ); + + JoinGroup( 0 ); +} + +YaRemote::YaRemote( stem::addr_type id ) : + VTHandler( id ), + count(0), + ocount(0) +{ + cnd.set( false ); + gr.set( false ); + + JoinGroup( 0 ); +} + +YaRemote::YaRemote( stem::addr_type id, const char *info ) : + VTHandler( id, info ), + count(0), + ocount(0) +{ + cnd.set( false ); + gr.set( false ); + + JoinGroup( 0 ); +} + +YaRemote::~YaRemote() +{ + // cnd.wait(); +} + +void YaRemote::handler( const stem::Event& ev ) +{ + msg = ev.value(); + + cnd.set( true ); +} + +void YaRemote::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) +{ + // cerr << "Hello " << ev.src() << endl; + ++count; + + // VTNewMember_data( ev, "" ); + VTHandler::VSNewMember( ev ); + + stem::EventVoid gr_ev( VS_DUMMY_GREETING ); + gr_ev.dest( ev.src() ); + Send( gr_ev ); +} + +void YaRemote::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) +{ + // cerr << "Hello" << endl; + ++ocount; +} + +void YaRemote::wait() +{ + cnd.try_wait(); + + cnd.set( false ); +} + +void YaRemote::greeting() +{ + gr.set( true ); +} + +DEFINE_RESPONSE_TABLE( YaRemote ) + EV_EDS( ST_NULL, VS_DUMMY_MESS, handler ) + EV_VOID( ST_NULL, VS_DUMMY_GREETING, greeting ) +END_RESPONSE_TABLE + +int EXAM_IMPL(vtime_operations::remote) +{ + const char fname[] = "/tmp/yanus_test.shm"; + xmt::shm_alloc<0> seg; + xmt::allocator_shm<xmt::__condition<true>,0> shm_cnd; + xmt::allocator_shm<xmt::__barrier<true>,0> shm_b; + + try { + seg.allocate( fname, 4*4096, xmt::shm_base::create | xmt::shm_base::exclusive, 0600 ); + xmt::__barrier<true>& b = *new ( shm_b.allocate( 1 ) ) xmt::__barrier<true>(); + + try { + xmt::fork(); + + b.wait(); + + NetTransportMgr mgr; + + addr_type zero = mgr.open( "localhost", 6980 ); + + EXAM_ASYNC_CHECK( mgr.good() ); + + YaRemote obj2; + + exit(0); + } + catch ( xmt::fork_in_parent& child ) { + sockmgr_stream_MP<NetTransport> srv( 6980 ); + + EXAM_REQUIRE( srv.good() ); + + b.wait(); + + YaRemote obj1; + + int stat; + EXAM_CHECK( waitpid( child.pid(), &stat, 0 ) == child.pid() ); + + srv.close(); + srv.wait(); + } + + (&b)->~__barrier<true>(); + shm_b.deallocate( &b, 1 ); + } + catch ( const xmt::shm_bad_alloc& err ) { + EXAM_ERROR( err.what() ); + } + + seg.deallocate(); + unlink( fname ); + + return EXAM_RESULT; +} + Modified: trunk/complement/explore/test/virtual_time/vtime.cc =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-13 15:52:01 UTC (rev 1679) +++ trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-16 18:44:28 UTC (rev 1680) @@ -4,6 +4,7 @@ #include <iostream> #include <stdint.h> +#include <stem/EvManager.h> namespace vt { @@ -650,7 +651,7 @@ catch ( ... ) { } #endif // __FIT_VS_TRACE - vt.dpool.push_back( make_pair( 0, new Event_base<VTmess>(m) ) ); // 0 should be timestamp + vt.dpool.push_back( make_pair( xmt::timespec(xmt::timespec::now), new Event_base<VTmess>(m) ) ); // 0 should be timestamp } } @@ -736,9 +737,24 @@ // check local or remote? i->second.addr // if remote, forward it to foreign VTDispatcher? - check_and_send( k->second, m ); - - vt.base_advance(g->second); // store last send VT to obj + try { + /* const transport tr = */ manager()->transport( k->second.stem_addr() ); + gaddr_type ga = manager()->reflect( k->second.stem_addr() ); + if ( ga != gaddr_type() ) { + ga.addr = 2; // vtd + addr_type a = manager()->reflect( ga ); + if ( a == badaddr ) { + a = manager()->SubscribeRemote( ga, "vtd" ); + } + m.dest( a ); + Forward( m ); + vt.base_advance(g->second); // store last send VT to obj + } + } + catch ( const range_error& ) { + check_and_send( k->second, m ); + vt.base_advance(g->second); // store last send VT to obj + } } catch ( const out_of_range& err ) { try { Modified: trunk/complement/explore/test/virtual_time/vtime.h =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.h 2007-08-13 15:52:01 UTC (rev 1679) +++ trunk/complement/explore/test/virtual_time/vtime.h 2007-08-16 18:44:28 UTC (rev 1680) @@ -17,6 +17,8 @@ #include <stem/Event.h> #include <stem/EventHandler.h> +#include <mt/time.h> + namespace vt { // typedef stem::addr_type oid_type; @@ -258,7 +260,7 @@ public: // delay pool should be here - typedef std::pair<int,stem::Event_base<VTmess>*> delay_item_t; + typedef std::pair<xmt::timespec,stem::Event_base<VTmess>*> delay_item_t; typedef std::list<delay_item_t> dpool_t; dpool_t dpool; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |