Thread: [Cppunit-cvs] cppunit2/src/opentest sharedmemorytransport.cpp,1.1,1.2 SConscript,1.4,1.5
Brought to you by:
blep
From: Baptiste L. <bl...@us...> - 2005-06-30 21:48:22
|
Update of /cvsroot/cppunit/cppunit2/src/opentest In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv14156/src/opentest Modified Files: sharedmemorytransport.cpp SConscript Log Message: * rough implementation of the overall logic, need heavy testing Index: SConscript =================================================================== RCS file: /cvsroot/cppunit/cppunit2/src/opentest/SConscript,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** SConscript 24 Jun 2005 20:13:07 -0000 1.4 --- SConscript 30 Jun 2005 21:48:09 -0000 1.5 *************** *** 5,8 **** --- 5,9 ---- remoteinterfaces.cpp serializer.cpp + sharedmemorytransport.cpp texttestdriver.cpp """ ), Index: sharedmemorytransport.cpp =================================================================== RCS file: /cvsroot/cppunit/cppunit2/src/opentest/sharedmemorytransport.cpp,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** sharedmemorytransport.cpp 26 Jun 2005 21:24:30 -0000 1.1 --- sharedmemorytransport.cpp 30 Jun 2005 21:48:09 -0000 1.2 *************** *** 1,13 **** #include <opentest/sharedmemorytransport.h> ! #include <opentest/remoteinterfaces.h> ! #include <cpptl/thread.h> ! #include <tchar.h> ! #define WIN32_LEAN_AND_MEAN ! #define NOGDI ! #define NOUSER ! #define NOKERNEL ! #define NOSOUND ! #include <windows.h> /* Importance notes: --- 1,15 ---- #include <opentest/sharedmemorytransport.h> ! #ifndef OPENTEST_NO_SHAREDMEMORYTRANSPORT ! # include <opentest/remoteinterfaces.h> ! # include <opentest/serializer.h> ! # include <cpptl/thread.h> ! # include <tchar.h> ! # define WIN32_LEAN_AND_MEAN ! # define NOGDI ! # define NOUSER ! # define NOKERNEL ! # define NOSOUND ! # include <windows.h> /* Importance notes: *************** *** 65,68 **** --- 67,71 ---- ::CloseHandle( handle_ ); handle_ = handle; + return *this; } *************** *** 73,76 **** --- 76,103 ---- // ////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////// + // class ScopedMutexLock + // ////////////////////////////////////////////////////////////////// + // ////////////////////////////////////////////////////////////////// + class ScopedMutexLock + { + public: + ScopedMutexLock( HANDLE mutex ) + : mutex_( mutex ) + { + ::WaitForSingleObject( mutex, INFINITE ); + } + + ~ScopedMutexLock() + { + ::ReleaseMutex( mutex_ ); + } + + private: + HANDLE mutex_; + }; + + + // ////////////////////////////////////////////////////////////////// + // ////////////////////////////////////////////////////////////////// // class BaseEvent // ////////////////////////////////////////////////////////////////// *************** *** 157,160 **** --- 184,188 ---- public: typedef DWORD Pos; + typedef unsigned char *Byte; SharedMemoryTransportImpl( const SharedMemoryConfig &config ); *************** *** 163,168 **** --- 191,200 ---- virtual ~SharedMemoryTransportImpl(); + // Called from 'main' thread void send( const RemoteMessagePtr &message ); + // Called from 'main' thread + void dispatchReceivedMessages( RemoteMessageServer &server ); + #pragma pack(push, 1) // The structure CircularBuffer & SharedData are *************** *** 174,180 **** struct CircularBuffer { Pos size_; Pos readPos_; // between 0 and size_ ! Pos writePos_; // between 0 and 2*size_ }; --- 206,244 ---- struct CircularBuffer { + Pos availableForReadingSequentially() const + { + return writePos_ <= size_ ? writePos_ - readPos_ + : size_ - readPos_; + } + + Pos availableForWritingSequentially() const + { + return writePos_ <= size_ ? size_ - writePos_ + : readPos_ - (writePos_ - size_); + } + + Pos normalizeWritePos() const + { + return writePos_ >= size_ ? writePos_ - size_ : writePos_; + } + + void read( Pos length ) + { + readPos_ += length; + if ( readPos_ == size_ ) + { + CPPTL_ASSERT_MESSAGE( writePos_ >= size_, "Invalid write position" ); + writePos_ -= size_; + } + } + + void wrote( Pos length ) + { + writePos_ += length; + } + Pos size_; Pos readPos_; // between 0 and size_ ! Pos writePos_; // between 0 and 2*size_. if > size_ then reading from read-> size + 0 -> write%size_ }; *************** *** 185,197 **** DWORD magic_; HANDLE mutex_; ! CircularBuffer writeBuffer_; ! CircularBuffer readBuffer_; }; #pragma pack(pop) private: void initializeCircularBuffer( CircularBuffer &buffer ); HANDLE autoManage( HANDLE handle ); ! void setUpReadWriteSystemEvent(); void checkManualEvents(); void log( const char *format, ... ) {} --- 249,294 ---- DWORD magic_; HANDLE mutex_; ! CircularBuffer circularBuffers_[2]; }; #pragma pack(pop) private: + enum BufferState + { + bsSize = 0, + bsMessage + }; + + struct BufferInfo + { + void read( const char *hint, Pos readLength ) + { + // log( "read: %s, read %d bytes at offset %d", hint, readLength, circular_->readPos_ ); + circular_->read( readLength ); + processedLength_ += readLength; + } + + void write( const char *hint, Pos writeLength ) + { + // log( "write: %s, wrote %d bytes at offset %d", hint, writeLength, circular_->normalizedWritePos() ); + circular_->wrote( writeLength ); + processedLength_ += writeLength; + } + + NamedEvent event_; + Stream stream_; + BufferState state_; + CircularBuffer *circular_; + Byte *data_; + DWORD messageLength_; + Pos processedLength_; + }; + + void createSharedMemoryRegion(); + void openSharedMemoryRegion(); + void setSharedMemoryRegion(); void initializeCircularBuffer( CircularBuffer &buffer ); HANDLE autoManage( HANDLE handle ); ! void setUpReadWriteBuffers(); void checkManualEvents(); void log( const char *format, ... ) {} *************** *** 201,223 **** void threadMain(); void prepareWaitObjects(); void readAndSendPendingData(); private: ! enum { ! readOperation = 0, ! writeOperation = 1 ! } ! ! typedef std::deque<RemoteMessagePtr> MessagesToSend; ! MessagesToSend messagesToSend_; ! CppTL::Mutex messagesToSendLock_ std::vector<AutoHandle> handles_; std::vector<HANDLE> waitObjects_; SharedMemoryConfig config_; ! NamedEvent readWriteEvents_[2]; ! Stream readWriteStreams_[2]; AnonymousManualEvent stopEvent_; AnonymousManualEvent sendMessageEvent_; AutoHandle hSharedMemory_; SharedData *shared_; unsigned int id_; --- 298,323 ---- void threadMain(); void prepareWaitObjects(); + void prepareMessageToSend(); void readAndSendPendingData(); + void pushReceivedMessages(); + void readPendingData(); + void writePendingData(); + BufferInfo &getReadBuffer(); + BufferInfo &getWriteBuffer(); private: ! typedef std::deque<RemoteMessagePtr> RemoteMessages; ! RemoteMessages messagesToSend_; ! RemoteMessages messagesToDispatch_; ! CppTL::Mutex messagesToSendLock_; ! CppTL::Mutex messagesToDispatchLock_; std::vector<AutoHandle> handles_; std::vector<HANDLE> waitObjects_; SharedMemoryConfig config_; ! BufferInfo buffers_[2]; AnonymousManualEvent stopEvent_; AnonymousManualEvent sendMessageEvent_; AutoHandle hSharedMemory_; + AutoHandle thread_; SharedData *shared_; unsigned int id_; *************** *** 236,245 **** } unsigned int pid = ::GetCurrentProcessId(); ! nameLength_ = _tprintf( nameBuffer_, _T("cpput_%08x_%08x"), pid, id ); CPPTL_ASSERT_MESSAGE( nameLength_ < sizeof(nameBuffer_)-1, "buffer overflow" ); createSharedMemoryRegion(); ! setUpReadWriteSystemEvent(); checkManualEvents(); } --- 336,345 ---- } unsigned int pid = ::GetCurrentProcessId(); ! nameLength_ = _tprintf( nameBuffer_, _T("cpput_%08x_%08x"), pid, id_ ); CPPTL_ASSERT_MESSAGE( nameLength_ < sizeof(nameBuffer_)-1, "buffer overflow" ); createSharedMemoryRegion(); ! setUpReadWriteBuffers(); checkManualEvents(); } *************** *** 259,262 **** --- 359,363 ---- #endif openSharedMemoryRegion(); + setUpReadWriteBuffers(); checkManualEvents(); } *************** *** 273,277 **** setChildProcessCanInheritSecurity( saAttr ); ! DWORD fileMappingSize = sizeof(SharedData) + 2*config.bufferSize_; hSharedMemory_ = ::CreateFileMapping( INVALID_HANDLE_VALUE, &saAttr, // inherits handle --- 374,378 ---- setChildProcessCanInheritSecurity( saAttr ); ! DWORD fileMappingSize = sizeof(SharedData) + 2*config_.bufferSize_; hSharedMemory_ = ::CreateFileMapping( INVALID_HANDLE_VALUE, &saAttr, // inherits handle *************** *** 289,294 **** shared_->mutex_ = autoManage( ::CreateMutex( &saAttr, FALSE, 0 ) ); shared_->magic_ = magicKey; ! initializeCircularBuffer( shared_->readBuffer_ ); ! initializeCircularBuffer( shared_->writeBuffer_ ); if ( !shared_->mutex_ ) --- 390,395 ---- shared_->mutex_ = autoManage( ::CreateMutex( &saAttr, FALSE, 0 ) ); shared_->magic_ = magicKey; ! initializeCircularBuffer( shared_->circularBuffers_[0] ); ! initializeCircularBuffer( shared_->circularBuffers_[1] ); if ( !shared_->mutex_ ) *************** *** 311,315 **** void ! SharedMemoryTransportImpl::setSharedMemoryRegion( HANDLE hMapFile ) { LPVOID lpMapAddress = ::MapViewOfFile( --- 412,416 ---- void ! SharedMemoryTransportImpl::setSharedMemoryRegion() { LPVOID lpMapAddress = ::MapViewOfFile( *************** *** 327,333 **** SharedMemoryTransportImpl::initializeCircularBuffer( CircularBuffer &buffer ) { ! buffer_.size_ = size; ! buffer_.readPos_ = 0; ! buffer_.writePos_ = 0; } --- 428,434 ---- SharedMemoryTransportImpl::initializeCircularBuffer( CircularBuffer &buffer ) { ! buffer.size_ = config_.bufferSize_; ! buffer.readPos_ = 0; ! buffer.writePos_ = 0; } *************** *** 336,340 **** SharedMemoryTransportImpl::autoManage( HANDLE handle ) { ! handlers_.push_back( handle ); return handle; } --- 437,441 ---- SharedMemoryTransportImpl::autoManage( HANDLE handle ) { ! handles_.push_back( handle ); return handle; } *************** *** 342,346 **** void ! SharedMemoryTransportImpl::setUpReadWriteSystemEvent() { for ( int index = 0; index < 2; ++index ) --- 443,447 ---- void ! SharedMemoryTransportImpl::setUpReadWriteBuffers() { for ( int index = 0; index < 2; ++index ) *************** *** 348,355 **** static const TCHAR suffixes[] = _T( "rw" ); // read/write event name suffix. nameBuffer_[nameLength_] = suffixes[index]; nameBuffer_[nameLength_+1] = 0; ! if ( readWriteEvents_[index].create( nameBuffer_ ) == 0 ) throw SharedMemoryError( "Failed to create named event." ); } nameBuffer_[nameLength_] = 0; --- 449,465 ---- static const TCHAR suffixes[] = _T( "rw" ); // read/write event name suffix. + int orientedIndex = isConnectionCreator_ ? index : 1 - index; + BufferInfo &buffer = buffers_[ orientedIndex ]; + nameBuffer_[nameLength_] = suffixes[index]; nameBuffer_[nameLength_+1] = 0; ! if ( buffer.event_.create( nameBuffer_ ) == 0 ) throw SharedMemoryError( "Failed to create named event." ); + buffer.state_ = bsSize; + buffer.circular_ = &shared_->circularBuffers_[ orientedIndex ]; + buffer.data_ = reinterpret_cast<Byte *>( shared_ ) + sizeof(SharedData); + if ( orientedIndex > 0 ) + buffer.data_ += shared_->circularBuffers_[ 0 ].size_; + buffer.processedLength_ = 0; } nameBuffer_[nameLength_] = 0; *************** *** 374,377 **** --- 484,504 ---- + void + SharedMemoryTransportImpl::dispatchReceivedMessages( RemoteMessageServer &server ) + { + RemoteMessages messages; + { + CppTL::Mutex::ScopedLockGuard guard( messagesToDispatchLock_ ); + messages.swap( messagesToDispatch_ ); + } + + while ( !messages.empty() ) + { + server.dispatchMessage( messages.front() ); + messages.pop_front(); + } + } + + DWORD WINAPI SharedMemoryTransportImpl::threadBootstrap( void *p ) *************** *** 389,393 **** DWORD threadId; thread_ = ::CreateThread( 0, 0, ! &SharedMemoryCommunicationManager::threadBootstrap, this, 0, &threadId ); if ( thread_ == 0 ) --- 516,520 ---- DWORD threadId; thread_ = ::CreateThread( 0, 0, ! &SharedMemoryTransportImpl::threadBootstrap, this, 0, &threadId ); if ( thread_ == 0 ) *************** *** 433,438 **** else if ( event + WAIT_OBJECT_0 == WAIT_FAILED ) { ! System::Win32SystemError error( "wait failed" ); ! log( "%s", error.what() ); } else --- 560,564 ---- else if ( event + WAIT_OBJECT_0 == WAIT_FAILED ) { ! log( "event wait failed" ); } else *************** *** 440,444 **** break; // timeout or abandonned event => child process died. } ! if ( waitObjects_[event] == stopEvent_ ) { log( "Stop event signaled !" ); --- 566,570 ---- break; // timeout or abandonned event => child process died. } ! if ( waitObjects_[event] == stopEvent_.get() ) { log( "Stop event signaled !" ); *************** *** 448,452 **** prepareMessageToSend(); readAndSendPendingData(); ! } log( "Thread stopped." ); --- 574,578 ---- prepareMessageToSend(); readAndSendPendingData(); ! pushReceivedMessages(); } log( "Thread stopped." ); *************** *** 469,474 **** waitObjects_.push_back( stopEvent_.get() ); waitObjects_.push_back( sendMessageEvent_.get() ); ! waitObjects_.push_back( readWriteEvents_[0].get() ); ! waitObjects_.push_back( readWriteEvents_[1].get() ); } --- 595,600 ---- waitObjects_.push_back( stopEvent_.get() ); waitObjects_.push_back( sendMessageEvent_.get() ); ! waitObjects_.push_back( buffers_[0].event_.get() ); ! waitObjects_.push_back( buffers_[1].event_.get() ); } *************** *** 480,495 **** return; ! MessagesToSend messages; { CppTL::Mutex::ScopedLockGuard guard( messagesToSendLock_ ); ! messagesToSend.swap( messages ); } while ( !messages.empty() ) { ! getWriteStream().packets().beginMessage(); ! getWriteStream() << messages.front(); ! getWriteStream().packets().endMessage(); ! messages.pop(); } } --- 606,622 ---- return; ! RemoteMessages messages; { CppTL::Mutex::ScopedLockGuard guard( messagesToSendLock_ ); ! messagesToSend_.swap( messages ); } + BufferInfo &buffer = getWriteBuffer(); while ( !messages.empty() ) { ! buffer.stream_.packets().beginMessage(); ! buffer.stream_ << messages.front(); ! buffer.stream_.packets().endMessage(); ! messages.pop_front(); } } *************** *** 499,506 **** SharedMemoryTransportImpl::readAndSendPendingData() { ! // if ( readEvent().isSignaled() || writeEvent().isSignaled() } // ////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////// --- 626,788 ---- SharedMemoryTransportImpl::readAndSendPendingData() { ! if ( buffers_[0].event_.isSignaled() ! || buffers_[1].event_.isSignaled() ! || getWriteBuffer().stream_.packets().hasPendingMessage() ) ! { ! ScopedMutexLock guard( shared_->mutex_ ); ! readPendingData(); ! writePendingData(); ! } ! } ! ! ! void ! SharedMemoryTransportImpl::readPendingData() ! { ! BufferInfo &buffer = getReadBuffer(); ! buffer.event_.reset(); ! Pos oldReadPos = buffer.circular_->readPos_; ! while ( true ) ! { ! Pos available = buffer.circular_->availableForReadingSequentially(); ! if ( available == 0 ) ! break; ! switch ( buffer.state_ ) ! { ! case bsSize: ! { ! Pos toRead = CPPTL_MIN( available, ! sizeof(buffer.messageLength_) - buffer.processedLength_ ); ! memcpy( reinterpret_cast<Byte *>(&buffer.messageLength_) + buffer.processedLength_, ! buffer.data_ + buffer.processedLength_, ! toRead ); ! buffer.read( "Message size", toRead ); ! if ( buffer.processedLength_ == sizeof(buffer.messageLength_) ) ! { ! buffer.state_ = bsMessage; ! buffer.processedLength_ = 0; ! log( "read: message size is %d", buffer.messageLength_ ); ! buffer.stream_.packets().beginMessage(); ! } ! } ! break; ! case bsMessage: ! { ! Pos toRead = CPPTL_MIN( available, buffer.messageLength_ ); ! buffer.stream_.packets().received( buffer.data_ + buffer.circular_->readPos_, ! toRead ); ! buffer.read( "Message content", toRead ); ! if ( buffer.processedLength_ == buffer.messageLength_ ) ! { ! log( "read: message completly received" ); ! buffer.stream_.packets().endMessage(); ! buffer.state_ = bsSize; ! buffer.processedLength_ = 0; ! } ! } ! break; ! default: ! CPPTL_ASSERT_MESSAGE( false, "Invalid buffer state." ); ! break; ! } ! } ! ! if ( oldReadPos != buffer.circular_->readPos_ ) ! buffer.event_.signal(); ! } ! ! void ! SharedMemoryTransportImpl::writePendingData() ! { ! BufferInfo &buffer = getWriteBuffer(); ! buffer.event_.reset(); ! Pos oldWritePos = buffer.circular_->writePos_; ! while ( buffer.stream_.packets().hasPendingMessage() ) ! { ! Pos available = buffer.circular_->availableForWritingSequentially(); ! if ( available == 0 ) ! break; ! switch ( buffer.state_ ) ! { ! case bsSize: ! { ! if ( buffer.messageLength_ == 0 ) ! buffer.messageLength_ = buffer.stream_.packets().getFirstMessageLength(); ! ! Pos toWrite = CPPTL_MIN( available, ! sizeof(buffer.messageLength_) - buffer.processedLength_ ); ! memcpy( buffer.data_ + buffer.processedLength_, ! reinterpret_cast<Byte *>(&buffer.messageLength_) + buffer.processedLength_, ! toWrite ); ! buffer.write( "Message size", toWrite ); ! if ( buffer.processedLength_ == sizeof(buffer.messageLength_) ) ! { ! buffer.state_ = bsMessage; ! buffer.processedLength_ = 0; ! log( "write: message size is %d", buffer.messageLength_ ); ! } ! } ! break; ! case bsMessage: ! { ! Pos toWrite = CPPTL_MIN( available, buffer.messageLength_ ); ! Byte *target = buffer.data_ + buffer.circular_->normalizeWritePos(); ! Pos written = buffer.stream_.packets().send( target, toWrite ); ! buffer.write( "Message content", written ); ! if ( buffer.processedLength_ == buffer.messageLength_ ) ! { ! log( "write: message completly sent" ); ! buffer.state_ = bsSize; ! buffer.processedLength_ = 0; ! buffer.stream_.packets().discardFirstMessage(); ! } ! } ! break; ! default: ! CPPTL_ASSERT_MESSAGE( false, "Invalid buffer state." ); ! break; ! } ! } ! if ( oldWritePos != buffer.circular_->writePos_ ) ! buffer.event_.signal(); ! } ! ! ! void ! SharedMemoryTransportImpl::pushReceivedMessages() ! { ! RemoteMessages messages; ! BufferInfo &buffer = getReadBuffer(); ! while ( buffer.stream_.packets().hasPendingMessage() ) ! { ! RemoteMessagePtr message; ! buffer.stream_ >> message; ! if ( message ) ! messages.push_back( message ); ! buffer.stream_.packets().discardFirstMessage(); ! } ! ! CppTL::Mutex::ScopedLockGuard guard( messagesToDispatchLock_ ); ! messagesToDispatch_.insert( messagesToDispatch_.end(), ! messages.begin(), ! messages.end() ); } + SharedMemoryTransportImpl::BufferInfo & + SharedMemoryTransportImpl::getReadBuffer() + { + return buffers_[0]; + } + + + SharedMemoryTransportImpl::BufferInfo & + SharedMemoryTransportImpl::getWriteBuffer() + { + return buffers_[1]; + } + + + // ////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////// *************** *** 533,535 **** --- 815,826 ---- + void + SharedMemoryTransport::dispatchReceivedMessages( RemoteMessageServer &server ) + { + impl_->dispatchReceivedMessages( server ); + } + + } // namespace OpenTest + + #endif // OPENTEST_NO_SHAREDMEMORYTRANSPORT |