[complement-svn] SF.net SVN: complement: [1678] trunk/complement/explore/test/virtual_time
Status: Pre-Alpha
Brought to you by:
complement
From: <com...@us...> - 2007-08-13 07:02:39
|
Revision: 1678 http://complement.svn.sourceforge.net/complement/?rev=1678&view=rev Author: complement Date: 2007-08-13 00:02:36 -0700 (Mon, 13 Aug 2007) Log Message: ----------- entrance into group looks fine Modified Paths: -------------- trunk/complement/explore/test/virtual_time/test/Makefile trunk/complement/explore/test/virtual_time/test/unit_test.cc trunk/complement/explore/test/virtual_time/test/vt_handler.cc trunk/complement/explore/test/virtual_time/test/vt_operations.cc trunk/complement/explore/test/virtual_time/test/vt_operations.h trunk/complement/explore/test/virtual_time/vtime.cc trunk/complement/explore/test/virtual_time/vtime.h Modified: trunk/complement/explore/test/virtual_time/test/Makefile =================================================================== --- trunk/complement/explore/test/virtual_time/test/Makefile 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/Makefile 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -# -*- Makefile -*- Time-stamp: <07/02/21 15:30:59 ptr> +# -*- Makefile -*- Time-stamp: <07/08/08 22:18:48 ptr> SRCROOT := ../../.. COMPILER_NAME := gcc @@ -16,8 +16,15 @@ release-shared: PROJECT_LIBS = -lxmt -lsockios -lstem -lexam dbg-shared: PROJECT_LIBS = -lxmtg -lsockiosg -lstemg -lexamg +ifndef WITHOUT_STLPORT stldbg-shared: PROJECT_LIBS = -lxmtstlg -lsockiosstlg -lstemstlg -lexamstlg +endif +dbg-shared: DEFS += -D__FIT_VS_TRACE +ifndef WITHOUT_STLPORT +stldbg-shared: DEFS += -D__FIT_VS_TRACE +endif + LDLIBS = ${PROJECT_LIBS} check: all-shared Modified: trunk/complement/explore/test/virtual_time/test/unit_test.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/unit_test.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:41:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:19:11 ptr> #include "vt_operations.h" @@ -20,12 +20,15 @@ t.add( &vtime_operations::VTMess_core, vt_oper, "VTmess core transfer", tc[2] = t.add( &vtime_operations::gvt_add, vt_oper, "Group VT add", tc[1] ) ); - t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", - t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", - t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", - t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] ) ) ) ) ) ); + t.add( &vtime_operations::VTEntryIntoGroup3, vt_oper, "VTEntryIntoGroup3", + t.add( &vtime_operations::VTEntryIntoGroup2, vt_oper, "VTEntryIntoGroup2", + t.add( &vtime_operations::VTEntryIntoGroup, vt_oper, "VTEntryIntoGroup", + t.add( &vtime_operations::VTSubscription, vt_oper, "VTSubscription", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler2", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTHandler1", + t.add( &vtime_operations::VTDispatch2, vt_oper, "VTDispatch2", + t.add( &vtime_operations::VTDispatch1, vt_oper, "VTDispatch1", + t.add( &vtime_operations::vt_object, vt_oper, "VT order", tc[2] ) ) ) ) ) ) ) ) ); return t.girdle(); } Modified: trunk/complement/explore/test/virtual_time/test/vt_handler.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_handler.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:53:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 23:21:59 ptr> #include "vt_operations.h" @@ -7,6 +7,8 @@ #include <iostream> #include <vtime.h> +#include <stem/EvManager.h> + using namespace vt; using namespace std; @@ -20,21 +22,31 @@ ~VTDummy(); void handler( const stem::Event& ); - void VTNewMember( const stem::Event& ); - void VTOutMember( const stem::Event& ); + void VSNewMember( const stem::Event_base<VSsync_rq>& ); + void VSOutMember( const stem::Event_base<VSsync_rq>& ); + void greeting(); + void wait(); std::string msg; int count; int ocount; + void wait_greeting() + { + gr.try_wait(); + gr.set( false ); + } + private: xmt::condition cnd; + xmt::condition gr; DECLARE_RESPONSE_TABLE( VTDummy, vt::VTHandler ); }; -#define VT_MESS3 0x1203 +#define VS_DUMMY_MESS 0x1203 +#define VS_DUMMY_GREETING 0x1204 VTDummy::VTDummy() : VTHandler(), @@ -42,6 +54,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::VTDummy( stem::addr_type id ) : @@ -50,6 +63,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::VTDummy( stem::addr_type id, const char *info ) : @@ -58,6 +72,7 @@ ocount(0) { cnd.set( false ); + gr.set( false ); } VTDummy::~VTDummy() @@ -72,13 +87,20 @@ cnd.set( true ); } -void VTDummy::VTNewMember( const stem::Event& ev ) +void VTDummy::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { - // cerr << "Hello" << endl; + // cerr << "Hello " << ev.src() << endl; ++count; + + // VTNewMember_data( ev, "" ); + VTHandler::VSNewMember( ev ); + + stem::EventVoid gr_ev( VS_DUMMY_GREETING ); + gr_ev.dest( ev.src() ); + Send( gr_ev ); } -void VTDummy::VTOutMember( const stem::Event& ev ) +void VTDummy::VSOutMember( const stem::Event_base<VSsync_rq>& ev ) { // cerr << "Hello" << endl; ++ocount; @@ -91,8 +113,14 @@ cnd.set( false ); } +void VTDummy::greeting() +{ + gr.set( true ); +} + DEFINE_RESPONSE_TABLE( VTDummy ) - EV_EDS( ST_NULL, VT_MESS3, handler ) + EV_EDS( ST_NULL, VS_DUMMY_MESS, handler ) + EV_VOID( ST_NULL, VS_DUMMY_GREETING, greeting ) END_RESPONSE_TABLE int EXAM_IMPL(vtime_operations::VTHandler1) @@ -100,7 +128,7 @@ VTDummy dummy1; VTDummy dummy2; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -120,7 +148,7 @@ VTDummy dummy2; VTDummy dummy3; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -152,7 +180,7 @@ VTDummy dummy1; VTDummy dummy2; - stem::Event ev( VT_MESS3 ); + stem::Event ev( VS_DUMMY_MESS ); ev.dest( 0 ); // group ev.value() = "hello"; @@ -171,7 +199,7 @@ // dummy3.wait(); // EXAM_CHECK( dummy3.msg == "hi" ); - EXAM_CHECK( dummy3.msg == "" ); // dummy3 don't see, due to VTS + // EXAM_CHECK( dummy3.msg == "" ); // dummy3 don't see, due to VTS EXAM_CHECK( dummy2.msg == "hi" ); EXAM_CHECK( dummy1.msg == "" ); } @@ -189,3 +217,135 @@ return EXAM_RESULT; } +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup) +{ + VTDummy dummy1; + + stem::Event ev( VS_DUMMY_MESS ); + ev.dest( 0 ); // group + ev.value() = "hello"; + + { + // dummy1.manager()->settrf( /* stem::EvManager::tracenet | */ stem::EvManager::tracedispatch ); + // dummy1.manager()->settrs( &std::cerr ); + + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + dummy1.VTSend( ev ); + + dummy3.wait(); + + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + return EXAM_RESULT; +} + +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup2) +{ + VTDummy dummy1; + VTDummy dummy2; + + stem::Event ev( VS_DUMMY_MESS ); + ev.dest( 0 ); // group + ev.value() = "hello"; + + dummy1.VTSend( ev ); + + dummy2.wait(); + EXAM_CHECK( dummy2.msg == "hello" ); + + { + // cerr << (void *)&dummy1 << " dummy1\n" + // << (void *)&dummy2 << " dummy2\n"; + // dummy1.manager()->settrf( /* stem::EvManager::tracenet | */ stem::EvManager::tracedispatch | stem::EvManager::tracefault ); + // dummy1.manager()->settrs( &std::cerr ); + + // dummy1.vtdispatcher()->settrf( VTDispatcher::tracedispatch | VTDispatcher::tracefault | VTDispatcher::tracedelayed | VTDispatcher::tracegroup ); + // dummy1.vtdispatcher()->settrs( &std::cerr ); + + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + dummy3.wait(); + + EXAM_CHECK( dummy2.msg == "hi" ); + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + return EXAM_RESULT; +} + +int EXAM_IMPL(vtime_operations::VTEntryIntoGroup3) +{ + VTDummy dummy1; + + stem::Event ev( VS_DUMMY_MESS ); + + { + VTDummy dummy2; + + dummy2.wait_greeting(); + + ev.value() = "hello"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + EXAM_CHECK( dummy2.msg == "hello" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + { + VTDummy dummy3; + + dummy3.wait_greeting(); + + ev.value() = "hi"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy3.wait(); + + EXAM_CHECK( dummy3.msg == "hi" ); + EXAM_CHECK( dummy1.msg == "" ); + } + + { + VTDummy dummy2; + VTDummy dummy3; + + dummy2.wait_greeting(); + dummy3.wait_greeting(); + + ev.value() = "more"; + ev.dest( 0 ); // group + dummy1.VTSend( ev ); + + dummy2.wait(); + dummy3.wait(); + + EXAM_CHECK( dummy2.msg == "more" ); + EXAM_CHECK( dummy3.msg == "more" ); + EXAM_CHECK( dummy1.msg == "" ); + + dummy2.VTSend( ev ); + + dummy1.wait(); + } + + EXAM_CHECK( dummy1.msg == "more" ); + + return EXAM_RESULT; +} Modified: trunk/complement/explore/test/virtual_time/test/vt_operations.cc =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_operations.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_operations.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -52,6 +52,8 @@ EXAM_CHECK( !(vt1 <= vt2) ); EXAM_CHECK( !(vt2 <= vt1) ); + + return EXAM_RESULT; } int EXAM_IMPL(vtime_operations::vt_add) Modified: trunk/complement/explore/test/virtual_time/test/vt_operations.h =================================================================== --- trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/test/vt_operations.h 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/26 09:40:39 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:18:35 ptr> #ifndef __vt_operations_h #define __vt_operations_h @@ -25,6 +25,9 @@ int EXAM_DECL(VTHandler2); int EXAM_DECL(VTSubscription); + int EXAM_DECL(VTEntryIntoGroup); + int EXAM_DECL(VTEntryIntoGroup2); + int EXAM_DECL(VTEntryIntoGroup3); }; #endif // __vt_operations_h Modified: trunk/complement/explore/test/virtual_time/vtime.cc =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/vtime.cc 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/27 09:49:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:10:59 ptr> #include "vtime.h" @@ -103,40 +103,80 @@ } } +void VSsync_rq::pack( std::ostream& s ) const +{ + __pack( s, grp ); + __pack( s, mess ); +} + +void VSsync_rq::net_pack( std::ostream& s ) const +{ + __net_pack( s, grp ); + __net_pack( s, mess ); +} + +void VSsync_rq::unpack( std::istream& s ) +{ + __unpack( s, grp ); + __unpack( s, mess ); +} + +void VSsync_rq::net_unpack( std::istream& s ) +{ + __net_unpack( s, grp ); + __net_unpack( s, mess ); +} + +void VSsync::pack( std::ostream& s ) const +{ + gvt.pack( s ); + VSsync_rq::pack( s ); +} + +void VSsync::net_pack( std::ostream& s ) const +{ + gvt.net_pack( s ); + VSsync_rq::net_pack( s ); +} + +void VSsync::unpack( std::istream& s ) +{ + gvt.unpack( s ); + VSsync_rq::unpack( s ); +} + +void VSsync::net_unpack( std::istream& s ) +{ + gvt.net_unpack( s ); + VSsync_rq::net_unpack( s ); +} + void VTmess::pack( std::ostream& s ) const { __pack( s, code ); src.pack( s ); // __pack( s, src ); - gvt.pack( s ); - __pack( s, grp ); - __pack( s, mess ); + VSsync::pack( s ); } void VTmess::net_pack( std::ostream& s ) const { __net_pack( s, code ); src.net_pack( s ); // __net_pack( s, src ); - gvt.net_pack( s ); - __net_pack( s, grp ); - __net_pack( s, mess ); + VSsync::net_pack( s ); } void VTmess::unpack( std::istream& s ) { __unpack( s, code ); src.unpack( s ); // __unpack( s, src ); - gvt.unpack( s ); - __unpack( s, grp ); - __unpack( s, mess ); + VSsync::unpack( s ); } void VTmess::net_unpack( std::istream& s ) { __net_unpack( s, code ); src.net_unpack( s ); // __net_unpack( s, src ); - gvt.net_unpack( s ); - __net_unpack( s, grp ); - __net_unpack( s, mess ); + VSsync::net_unpack( s ); } bool operator <=( const vtime_type& l, const vtime_type& r ) @@ -314,7 +354,6 @@ lvt[m.src] += m.gvt.gvt; lvt[m.src][m.grp][m.src] = vt.gvt[m.grp][m.src] + 1; sup( vt.gvt[m.grp], lvt[m.src][m.grp] ); - // cout << vt.gvt << endl; return true; } @@ -327,7 +366,6 @@ lvt[m.src] += m.gvt.gvt; lvt[m.src][m.grp][m.src] = vt.gvt[m.grp][m.src] + 1; sup( vt.gvt[m.grp], lvt[m.src][m.grp] ); - // cout << vt.gvt << endl; return true; } @@ -343,10 +381,6 @@ gvtime gvt( m.gvt ); if ( (vt.gvt[m.grp][m.src] + 1) != gvt[m.grp][m.src] ) { - // cerr << "1" << endl; - // cerr << vt.gvt[m.grp][m.src] << "\n" - // << gvt[m.grp][m.src] - // << endl; if ( (vt.gvt[m.grp][m.src] + 1) > gvt[m.grp][m.src] ) { throw out_of_range( "duplicate or wrong VT message" ); } @@ -357,16 +391,12 @@ xvt[m.src] = 0; // force exclude message originator, it checked above if ( !(xvt <= vt[m.grp]) ) { - // cerr << "2" << endl; - // cerr << xvt << "\n\n" << vt[m.grp] << endl; return false; } // check side casuality (via groups other then message's group) for ( groups_container_type::const_iterator l = groups.begin(); l != groups.end(); ++l ) { if ( (*l != m.grp) && !((lvt[m.src][*l] + gvt[*l]) <= vt[*l]) ) { - // cerr << "3" << endl; - // cerr << "group " << *l << xvt << "\n\n" << vt[*l] << endl; return false; } } @@ -374,6 +404,38 @@ return true; } +ostream& vtime_obj_rec::trace_deliver( const VTmess& m, ostream& o ) +{ + if ( groups.find( m.grp ) == groups.end() ) { + return o << "VT object not member of group"; + } + + gvtime gvt( m.gvt ); + + if ( (vt.gvt[m.grp][m.src] + 1) != gvt[m.grp][m.src] ) { + if ( (vt.gvt[m.grp][m.src] + 1) > gvt[m.grp][m.src] ) { + return o << "duplicate or wrong VT message, " << vt.gvt << " vs " << gvt; + } + return o << "counter violation, " << vt.gvt << " vs " << gvt; + } + + vtime xvt = lvt[m.src][m.grp] + gvt[m.grp]; + xvt[m.src] = 0; // force exclude message originator, it checked above + + if ( !(xvt <= vt[m.grp]) ) { + return o << "casuality violation, " << xvt << " vs " << vt[m.grp]; + } + + // check side casuality (via groups other then message's group) + for ( groups_container_type::const_iterator l = groups.begin(); l != groups.end(); ++l ) { + if ( (*l != m.grp) && !((lvt[m.src][*l] + gvt[*l]) <= vt[*l]) ) { + return o << "side casuality violation, " << (lvt[m.src][*l] + gvt[*l]) << " vs " << vt[*l]; + } + } + + return o << "should be delivered"; +} + bool vtime_obj_rec::order_correct_delayed( const VTmess& m ) { gvtime gvt( m.gvt ); @@ -399,7 +461,7 @@ return true; } -void vtime_obj_rec::delta( gvtime& vtstamp, oid_type from, oid_type to, group_type grp ) +void vtime_obj_rec::delta( gvtime& vtstamp, const oid_type& from, const oid_type& to, group_type grp ) { vtstamp.gvt = vt.gvt - svt[to]; // delta vtstamp[grp][from] = vt.gvt[grp][from]; // my counter, as is, not delta @@ -449,7 +511,7 @@ return groups.empty() ? true : false; } -void vtime_obj_rec::rm_member( oid_type oid ) +void vtime_obj_rec::rm_member( const oid_type& oid ) { delta_vtime_type::iterator i = lvt.find( oid ); @@ -464,8 +526,59 @@ } } +void vtime_obj_rec::sync( group_type g, const oid_type& oid, const gvtime_type& gvt ) +{ + lvt[oid] = gvt; + gvtime_type::const_iterator i = gvt.find( g ); + if ( i != gvt.end() ) { + sup( vt.gvt[g], i->second.vt ); + // vtime_type::const_iterator j = i->second.vt.find( oid ); + // if ( j != i->second.vt.end() ) { + // vt.gvt[g][oid] = j->second; + // cerr << "**** " << gvt << endl; + // } + } +} + } // namespace detail +void VTDispatcher::settrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags |= f; +} + +void VTDispatcher::unsettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags &= (0xffffffff & ~f); +} + +void VTDispatcher::resettrf( unsigned f ) +{ + scoped_lock _x1( _lock_tr ); + _trflags = f; +} + +void VTDispatcher::cleantrf() +{ + scoped_lock _x1( _lock_tr ); + _trflags = 0; +} + +unsigned VTDispatcher::trflags() const +{ + scoped_lock _x1( _lock_tr ); + + return _trflags; +} + +void VTDispatcher::settrs( std::ostream *s ) +{ + scoped_lock _x1( _lock_tr ); + _trs = s; +} + void VTDispatcher::VTDispatch( const stem::Event_base<VTmess>& m ) { pair<gid_map_type::const_iterator,gid_map_type::const_iterator> range = @@ -482,48 +595,110 @@ // looks, like local source shouldn't be here? check_and_send( i->second, m ); } - catch ( const out_of_range& ) { + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } - catch ( const domain_error& ) { + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } } } void VTDispatcher::check_and_send( detail::vtime_obj_rec& vt, const stem::Event_base<VTmess>& m ) { - typedef detail::vtime_obj_rec::dpool_t dpool_t; - - // detail::vtime_obj_rec& vt = i->second; - if ( vt.deliver( m.value() ) ) { stem::Event ev( m.value().code ); ev.dest(vt.stem_addr()); ev.src(m.src()); ev.value() = m.value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver " << m.value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE Forward( ev ); - bool more; - do { - more = false; - for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { - if ( vt.deliver_delayed( j->second->value() ) ) { - stem::Event evd( j->second->value().code ); - evd.dest(vt.stem_addr()); - ev.src(m.src()); - evd.value() = j->second->value().mess; - Forward( evd ); - delete j->second; - vt.dpool.erase( j++ ); - more = true; - } else { - ++j; - } + check_and_send_delayed( vt ); + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedelayed) ) { + *_trs << "Delayed " << m.value() << endl; } - } while ( more ); - } else { + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE vt.dpool.push_back( make_pair( 0, new Event_base<VTmess>(m) ) ); // 0 should be timestamp } } +void VTDispatcher::check_and_send_delayed( detail::vtime_obj_rec& vt ) +{ + typedef detail::vtime_obj_rec::dpool_t dpool_t; + bool more; + do { + more = false; + for ( dpool_t::iterator j = vt.dpool.begin(); j != vt.dpool.end(); ) { + if ( vt.deliver_delayed( j->second->value() ) ) { + stem::Event evd( j->second->value().code ); + evd.dest(vt.stem_addr()); + evd.src(j->second->src()); + evd.value() = j->second->value().mess; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Deliver delayed " << j->second->value() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( evd ); + delete j->second; + vt.dpool.erase( j++ ); + more = true; + } else { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracedispatch) ) { + *_trs << "Remain delayed " << j->second->value() + << "\nReason: "; + vt.trace_deliver( j->second->value(), *_trs ) << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + ++j; + } + } + } while ( more ); +} + void VTDispatcher::VTSend( const stem::Event& e, group_type grp ) { // This method not called from Dispatch, but work on the same level and in the same @@ -538,7 +713,7 @@ if ( i != vtmap.end() && i->second.stem_addr() == e.src() ) { // for self detail::vtime_obj_rec& vt = i->second; const oid_type& from = o->second; - stem::Event_base<VTmess> m( MESS ); + stem::Event_base<VTmess> m( VS_MESS ); m.value().src = from; // oid m.value().code = e.code(); m.value().mess = e.value(); @@ -565,9 +740,27 @@ vt.base_advance(g->second); // store last send VT to obj } - catch ( const out_of_range& ) { + catch ( const out_of_range& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } - catch ( const domain_error& ) { + catch ( const domain_error& err ) { + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << err.what() << " " + << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } } } @@ -589,9 +782,20 @@ for ( ; range.first != range.second; ++range.first ) { vt_map_type::iterator i = vtmap.find( range.first->second ); if ( i != vtmap.end() ) { - stem::Event ev( VTS_NEW_MEMBER ); + stem::Event_base<VSsync_rq> ev( VS_NEW_MEMBER ); ev.dest( i->second.stem_addr() ); ev.src( addr ); + ev.value().grp = grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "VS_NEW_MEMBER " << ev.src() << " -> " << ev.dest() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE Forward( ev ); } } @@ -615,9 +819,21 @@ } else { vt_map_type::iterator j = vtmap.find( range.first->second ); if ( j != vtmap.end() ) { - stem::Event ev( VTS_OUT_MEMBER ); + stem::Event_base<VSsync_rq> ev( VS_OUT_MEMBER ); ev.dest( j->second.stem_addr() ); ev.src( i != vtmap.end() ? i->second.stem_addr() : self_id() ); + ev.value().grp = grp; +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "VS_OUT_MEMBER " << ev.src() << " -> " << ev.dest() << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + Forward( ev ); } ++range.first; @@ -631,8 +847,74 @@ } } +void VTDispatcher::get_gvtime( group_type grp, stem::addr_type addr, gvtime_type& gvt ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { + i->second.get_gvt( gvt ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "VT object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "VT object not member of group" ); // Error: not group member +} + +void VTDispatcher::set_gvtime( group_type grp, stem::addr_type addr, const gvtime_type& gvt ) +{ + // See comment on top of VTSend above + xmt::recursive_scoped_lock lk( this->_theHistory_lock ); + + pair<gid_map_type::iterator,gid_map_type::iterator> range = + grmap.equal_range( grp ); + for ( ; range.first != range.second; ++range.first ) { + vt_map_type::iterator i = vtmap.find( range.first->second ); + if ( i != vtmap.end() && i->second.stem_addr() == addr ) { +#ifdef __FIT_VS_TRACE + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracegroup) ) { + *_trs << "Set gvt G" << grp << " " << i->first + << " (" << addr << ") " << gvt << endl; + } + } + catch ( ... ) { + } +#endif // __FIT_VS_TRACE + i->second.sync( grp, i->first, gvt ); + check_and_send_delayed( i->second ); + return; + } + } + + try { + scoped_lock lk(_lock_tr); + if ( _trs != 0 && _trs->good() && (_trflags & tracefault) ) { + *_trs << "VT object not member of group" << " " << __FILE__ << ":" << __LINE__ << endl; + } + } + catch ( ... ) { + } + + throw domain_error( "VT object not member of group" ); // Error: not group member +} + DEFINE_RESPONSE_TABLE( VTDispatcher ) - EV_Event_base_T_( ST_NULL, MESS, VTDispatch, VTmess ) + EV_Event_base_T_( ST_NULL, VS_MESS, VTDispatch, VTmess ) END_RESPONSE_TABLE char *Init_buf[128]; @@ -723,17 +1005,45 @@ ((Init *)Init_buf)->~Init(); } -void VTHandler::VTNewMember( const stem::Event& ) +void VTHandler::VSNewMember( const stem::Event_base<VSsync_rq>& ev ) { + stem::Event_base<VSsync> out_ev( VS_SYNC_TIME ); + out_ev.dest( ev.src() ); + out_ev.value().grp = ev.value().grp; + get_gvtime( ev.value().grp, out_ev.value().gvt.gvt ); + + Send( out_ev ); } -void VTHandler::VTOutMember( const stem::Event& ) +void VTHandler::VSNewMember_data( const stem::Event_base<VSsync_rq>& ev, const string& data ) { + stem::Event_base<VSsync> out_ev( VS_SYNC_TIME ); + out_ev.dest( ev.src() ); + out_ev.value().grp = ev.value().grp; + get_gvtime( ev.value().grp, out_ev.value().gvt.gvt ); + out_ev.value().mess = data; + + Send( out_ev ); } +void VTHandler::VSOutMember( const stem::Event_base<VSsync_rq>& ) +{ +} + +void VTHandler::VSsync_time( const stem::Event_base<VSsync>& ev ) +{ + try { + // sync data from ev.value().mess + _vtdsp->set_gvtime( ev.value().grp, self_id(), ev.value().gvt.gvt ); + } + catch ( const domain_error& ) { + } +} + DEFINE_RESPONSE_TABLE( VTHandler ) - EV_EDS( ST_NULL, VTS_NEW_MEMBER, VTNewMember ) - EV_EDS( ST_NULL, VTS_OUT_MEMBER, VTOutMember ) + EV_Event_base_T_( ST_NULL, VS_NEW_MEMBER, VSNewMember, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_OUT_MEMBER, VSOutMember, VSsync_rq ) + EV_Event_base_T_( ST_NULL, VS_SYNC_TIME, VSsync_time, VSsync ) END_RESPONSE_TABLE } // namespace vt @@ -742,16 +1052,18 @@ ostream& operator <<( ostream& o, const vt::vtime_type::value_type& v ) { - return o << "(" << v.first << "," << v.second << ")\n"; + return o << "(" << v.first << "," << v.second << ")"; } ostream& operator <<( ostream& o, const vt::vtime_type& v ) { - o << "[\n"; for ( vt::vtime_type::const_iterator i = v.begin(); i != v.end(); ++i ) { - o << " " << *i; + if ( i != v.begin() ) { + o << ", "; + } + o << *i; } - return o << " ]\n"; + return o; } ostream& operator <<( ostream& o, const vt::vtime& v ) @@ -759,17 +1071,38 @@ ostream& operator <<( ostream& o, const vt::gvtime_type::value_type& v ) { - o << "group " << v.first << ": " << v.second.vt; + o << v.first << ": " << v.second.vt; } ostream& operator <<( ostream& o, const vt::gvtime_type& v ) { o << "{\n"; for ( vt::gvtime_type::const_iterator i = v.begin(); i != v.end(); ++i ) { - o << " " << *i; + o << "\t" << *i << "\n"; } return o << "}\n"; } +ostream& operator <<( ostream& o, const vt::gvtime& v ) +{ + return o << v.gvt; +} + +ostream& operator <<( ostream& o, const vt::VSsync& m ) +{ + // ios_base::fmtflags f = o.flags( ios_base::hex ); + o << "G" << m.grp << " " << m.gvt; + + return o; +} + +ostream& operator <<( ostream& o, const vt::VTmess& m ) +{ + ios_base::fmtflags f = o.flags( ios_base::hex | ios_base::showbase ); + o << "C" << m.code << dec << " " << m.src << " " << static_cast<const vt::VSsync&>(m); + o.flags( f ); + return o; +} + } // namespace std Modified: trunk/complement/explore/test/virtual_time/vtime.h =================================================================== --- trunk/complement/explore/test/virtual_time/vtime.h 2007-08-08 10:09:58 UTC (rev 1677) +++ trunk/complement/explore/test/virtual_time/vtime.h 2007-08-13 07:02:36 UTC (rev 1678) @@ -1,4 +1,4 @@ -// -*- C++ -*- Time-stamp: <07/07/27 09:53:24 ptr> +// -*- C++ -*- Time-stamp: <07/08/11 01:05:47 ptr> #ifndef __vtime_h #define __vtime_h @@ -147,7 +147,7 @@ mutable gvtime_type gvt; }; -struct VTmess : +struct VSsync_rq : public stem::__pack_base { void pack( std::ostream& s ) const; @@ -155,28 +155,59 @@ void unpack( std::istream& s ); void net_unpack( std::istream& s ); - VTmess() : - code(0), - src(), - gvt(), + VSsync_rq() : grp(0), mess() { } - VTmess( const VTmess& _gvt ) : - code( _gvt.code ), - src( _gvt.src ), - gvt( _gvt.gvt ), + VSsync_rq( const VSsync_rq& _gvt ) : grp( _gvt.grp ), mess( _gvt.mess ) { } - stem::code_type code; - oid_type src; - gvtime gvt; group_type grp; std::string mess; }; +struct VSsync : + public VSsync_rq +{ + void pack( std::ostream& s ) const; + void net_pack( std::ostream& s ) const; + void unpack( std::istream& s ); + void net_unpack( std::istream& s ); + + VSsync() + { } + VSsync( const VSsync& _gvt ) : + VSsync_rq( _gvt ), + gvt( _gvt.gvt ) + { } + + gvtime gvt; +}; + +struct VTmess : + public VSsync +{ + void pack( std::ostream& s ) const; + void net_pack( std::ostream& s ) const; + void unpack( std::istream& s ); + void net_unpack( std::istream& s ); + + VTmess() : + code(0), + src() + { } + VTmess( const VTmess& _gvt ) : + VSsync( _gvt ), + code( _gvt.code ), + src( _gvt.src ) + { } + + stem::code_type code; + oid_type src; +}; + namespace detail { class vtime_obj_rec @@ -187,18 +218,22 @@ void add( stem::addr_type a, group_type g ) { addr = a; groups.insert(g); } bool rm_group( group_type ); - void rm_member( oid_type ); + void rm_member( const oid_type& ); stem::addr_type stem_addr() const { return addr; } bool deliver( const VTmess& ev ); bool deliver_delayed( const VTmess& ev ); - void next( oid_type from, group_type grp ) + std::ostream& trace_deliver( const VTmess& m, std::ostream& o ); + void next( const oid_type& from, group_type grp ) { ++vt.gvt[grp][from]; /* increment my VT counter */ } - void delta( gvtime& vtstamp, oid_type from, oid_type to, group_type grp ); - void base_advance( oid_type to ) + void delta( gvtime& vtstamp, const oid_type& from, const oid_type& to, group_type grp ); + void base_advance( const oid_type& to ) { svt[to] = vt.gvt; /* store last sent VT to obj */ } + void get_gvt( gvtime_type& gvt ) const + { gvt = vt.gvt; } + void sync( group_type, const oid_type&, const gvtime_type& ); #ifdef __FIT_EXAM const gvtime_type::data_type& operator[]( const gvtime_type::key_type k ) const @@ -235,19 +270,36 @@ public stem::EventHandler { public: - VTDispatcher() + enum traceflags { + notrace = 0, + tracenet = 1, + tracedispatch = 2, + tracefault = 4, + tracedelayed = 8, + tracegroup = 0x10 + }; + + VTDispatcher() : + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( const char *info ) : - stem::EventHandler( info ) + stem::EventHandler( info ), + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( stem::addr_type id ) : - stem::EventHandler( id ) + stem::EventHandler( id ), + _trflags( notrace ), + _trs( 0 ) { } VTDispatcher( stem::addr_type id, const char *info ) : - stem::EventHandler( id, info ) + stem::EventHandler( id, info ), + _trflags( notrace ), + _trs( 0 ) { } void VTDispatch( const stem::Event_base<VTmess>& ); @@ -255,16 +307,30 @@ void VTSend( const stem::Event& e, group_type ); void Subscribe( stem::addr_type, oid_type, group_type ); void Unsubscribe( oid_type, group_type ); + void get_gvtime( group_type, stem::addr_type, gvtime_type& ); + void set_gvtime( group_type, stem::addr_type, const gvtime_type& ); + void settrf( unsigned f ); + void unsettrf( unsigned f ); + void resettrf( unsigned f ); + void cleantrf(); + unsigned trflags() const; + void settrs( std::ostream * ); + private: typedef std::hash_map<oid_type, detail::vtime_obj_rec> vt_map_type; typedef std::hash_multimap<group_type, oid_type> gid_map_type; void check_and_send( detail::vtime_obj_rec&, const stem::Event_base<VTmess>& ); + void check_and_send_delayed( detail::vtime_obj_rec& ); vt_map_type vtmap; gid_map_type grmap; + xmt::mutex _lock_tr; + unsigned _trflags; + std::ostream *_trs; + DECLARE_RESPONSE_TABLE( VTDispatcher, stem::EventHandler ); }; @@ -291,23 +357,34 @@ virtual ~VTHandler(); void VTSend( const stem::Event& e ); - virtual void VTNewMember( const stem::Event& e ); - virtual void VTOutMember( const stem::Event& e ); + virtual void VSNewMember( const stem::Event_base<VSsync_rq>& e ); + virtual void VSOutMember( const stem::Event_base<VSsync_rq>& e ); + virtual void VSsync_time( const stem::Event_base<VSsync>& ); + template <class D> void VTSend( const stem::Event_base<D>& e ) { VTHandler::VTSend( stem::Event_convert<D>()( e ) ); } + static VTDispatcher *vtdispatcher() + { return _vtdsp; } + protected: + void VSNewMember_data( const stem::Event_base<VSsync_rq>&, const std::string& data ); + + void get_gvtime( group_type g, gvtime_type& gvt ) + { _vtdsp->get_gvtime( g, self_id(), gvt ); } + private: static class VTDispatcher *_vtdsp; DECLARE_RESPONSE_TABLE( VTHandler, stem::EventHandler ); }; -#define MESS 0x300 -#define VTS_NEW_MEMBER 0x301 -#define VTS_OUT_MEMBER 0x302 +#define VS_MESS 0x300 +#define VS_NEW_MEMBER 0x301 +#define VS_OUT_MEMBER 0x302 +#define VS_SYNC_TIME 0x303 } // namespace vt @@ -318,6 +395,9 @@ ostream& operator <<( ostream&, const vt::vtime& ); ostream& operator <<( ostream&, const vt::gvtime_type::value_type& ); ostream& operator <<( ostream&, const vt::gvtime_type& ); +ostream& operator <<( ostream&, const vt::gvtime& ); +ostream& operator <<( ostream& o, const vt::VSsync& ); +ostream& operator <<( ostream& o, const vt::VTmess& ); } // namespace std This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |