[brlcad-commits] SF.net SVN: brlcad:[44548] geomcore/trunk
Open Source Solid Modeling CAD
Brought to you by:
brlcad
From: <dav...@us...> - 2011-05-03 19:55:29
|
Revision: 44548 http://brlcad.svn.sourceforge.net/brlcad/?rev=44548&view=rev Author: davidloman Date: 2011-05-03 19:55:20 +0000 (Tue, 03 May 2011) Log Message: ----------- Experimental build. There was something really screwy with the way pkg was handling chunks of data larger than 24k, so i unwired the send and recv portions of pkg and "rolled my own". Things are much more stable and much faster. There is still a bit of work todo, but geomcore compiles and runs at this point. Modified Paths: -------------- geomcore/trunk/include/AbstractJob.h geomcore/trunk/include/GeometryManifestMsg.h geomcore/trunk/include/MakeAndRouteMsgJob.h geomcore/trunk/include/NetMsg.h geomcore/trunk/include/Portal.h geomcore/trunk/include/PortalManager.h geomcore/trunk/src/GS/CMakeLists.txt geomcore/trunk/src/GS/DataManager.cxx geomcore/trunk/src/GS/GSClient.cxx geomcore/trunk/src/libJob/AbstractJob.cxx geomcore/trunk/src/libNet/CMakeLists.txt geomcore/trunk/src/libNet/MakeAndRouteMsgJob.cxx geomcore/trunk/src/libNet/NetMsgFactory.cxx geomcore/trunk/src/libNet/Portal.cxx geomcore/trunk/src/libNet/PortalManager.cxx geomcore/trunk/src/libNet/netMsg/GeometryManifestMsg.cxx geomcore/trunk/src/libNet/netMsg/NetMsg.cxx geomcore/trunk/src/utility/ByteBuffer.cxx geomcore/trunk/tests/func/libJob/PrintToStdOutJob.cxx geomcore/trunk/tests/unit/CMakeLists.txt Removed Paths: ------------- geomcore/trunk/include/RouteMsgJob.h geomcore/trunk/src/libNet/RouteMsgJob.cxx Modified: geomcore/trunk/include/AbstractJob.h =================================================================== --- geomcore/trunk/include/AbstractJob.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/AbstractJob.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -61,7 +61,6 @@ class AbstractJob { public: - /** * Standard Destructor. Nothing to see here. */ @@ -85,7 +84,7 @@ /** * Returns the JobID for this job. JobID is a 32bit integer value. */ - GSUuid getJobId(); + GSUuid* getJobId(); protected: /** @@ -102,7 +101,7 @@ /** * Internal field for storing Job's ID. */ - GSUuid jobID; + GSUuid* jobID; /** * Internal field for storing Job's current status. Modified: geomcore/trunk/include/GeometryManifestMsg.h =================================================================== --- geomcore/trunk/include/GeometryManifestMsg.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/GeometryManifestMsg.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -32,7 +32,6 @@ class GeometryManifestMsg : public NetMsg { public: - /* Normal Constructor */ GeometryManifestMsg(std::list<std::string>& items); Modified: geomcore/trunk/include/MakeAndRouteMsgJob.h =================================================================== --- geomcore/trunk/include/MakeAndRouteMsgJob.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/MakeAndRouteMsgJob.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -33,7 +33,7 @@ class MakeAndRouteMsgJob : public AbstractJob { public: - MakeAndRouteMsgJob(NetMsg* msg); + MakeAndRouteMsgJob(Portal* p); virtual ~MakeAndRouteMsgJob(); protected: Modified: geomcore/trunk/include/NetMsg.h =================================================================== --- geomcore/trunk/include/NetMsg.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/NetMsg.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -60,7 +60,8 @@ /* *Getters n Setters */ - uint16_t getMsgType() const; + uint16_t getMsgType() const; + uint32_t getMsgLen() const; GSUuid* getMsgUUID() const; bool msgHasReUUID() const; GSUuid* getReUUID() const; @@ -75,6 +76,7 @@ protected: uint16_t msgType; + uint32_t msgLen; GSUuid* msgUUID; bool hasReUUID; //TODO replace this with a bit pack GSUuid* reUUID; Modified: geomcore/trunk/include/Portal.h =================================================================== --- geomcore/trunk/include/Portal.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/Portal.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -28,21 +28,25 @@ #define PKG_MAGIC2 0x5309 + #include "INetMsgHandler.h" -#include "PkgTcpClient.h" #include "NetMsg.h" #include "Logger.h" - +#include "GSThread.h" #include "pkg.h" - #include <string> +#define MAXCHUNKSIZE 4096 + class PortalManager; +class MakeAndRouteMsgJob; class Portal : public INetMsgHandler { public: friend class PortalManager; + friend class MakeAndRouteMsgJob; + virtual ~Portal(); int send(NetMsg* msg); int sendThenDisconnect(NetMsg* msg); @@ -50,33 +54,41 @@ void sendGSNodeName(); void disconnect(); - int flush(); std::string getRemoteNodeName(); bool handleNetMsg(NetMsg* msg); + void tryBuild(); + protected: - Portal(PortalManager* pm, PkgTcpClient* client, struct pkg_switch* table); + Portal(PortalManager* pm, int socket); - /* Not for public use since libPKG will block on this call. +/* Not for public use since libPKG will block on this call. * Returns: * <0 on error * 0 on EOF * 1 on success */ - int read(); + int pullFromSock(); private: PortalManager* pm; - struct pkg_switch* callbackTable; - PkgTcpClient* pkgClient; + + int socket; + std::string remoteNodeName; Logger* log; bool handshakeComplete; - static void callbackSpringboard(struct pkg_conn* conn, char* buf); + GSMutex recvBufLock; + ByteBuffer* recvBuffer; + MakeAndRouteMsgJob* builder; + + void tryToBuildNetMsgs(); + bool routeNetMsg(NetMsg* msg); + /* Disable copy cstr and =operator */ Portal(Portal const&){}; Portal& operator=(Portal const&){}; Modified: geomcore/trunk/include/PortalManager.h =================================================================== --- geomcore/trunk/include/PortalManager.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/PortalManager.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -27,7 +27,6 @@ #define __PORTALMANAGER_H__ #include "ControlledThread.h" -#include "PkgTcpServer.h" #include "INetMsgHandler.h" #include "TypeOnlyMsg.h" #include "Logger.h" @@ -44,40 +43,41 @@ class PortalManager : public ControlledThread, public INetMsgHandler { public: - PortalManager(std::string localNodeName, uint16_t port = 0, std::string address = std::string("127.0.0.1")); - ~PortalManager(); + PortalManager(std::string localNodeName, uint16_t port = 0, std::string address = std::string("127.0.0.1")); + ~PortalManager(); - Portal* connectToHost(std::string host, uint16_t port); - void disconnect(Portal* p); - bool handleNetMsg(NetMsg* msg); - std::string getLocalNodeName(); + Portal* connectToHost(std::string host, uint16_t port); + void disconnect(Portal* p); + bool handleNetMsg(NetMsg* msg); + std::string getLocalNodeName(); protected: - void _run(); + void _run(); private: - std::string localNodeName; - Logger* log; + std::string localNodeName; + Logger* log; - uint16_t listenPort; - std::string listenAddress; - PkgTcpServer* tcpServer; + uint16_t listenPort; + std::string listenAddress; - GSMutex masterFDSLock; - fd_set masterfds; - int fdmax; + GSMutex masterFDSLock; + fd_set masterfds; + int fdmax; - GSMutex* portalsLock; - std::map<int, Portal*>* fdPortalMap; + GSMutex* portalsLock; + std::map<int, Portal*>* fdPortalMap; - Portal* makeNewPortal(PkgTcpClient* client, struct pkg_switch* table); - struct pkg_switch* makeNewSwitchTable(); - void closeFD(int fd, std::string logComment); - void handleDisconnectReqMsg(TypeOnlyMsg* msg); + Portal* makeNewPortal(int socket); + int listen(); + int accept(int listener); + void closeFD(int fd, std::string logComment); - /* Disable copy cstr and =operator */ - PortalManager(PortalManager const&){}; - PortalManager& operator=(PortalManager const&){}; + void handleDisconnectReqMsg(TypeOnlyMsg* msg); + + /* Disable copy cstr and =operator */ + PortalManager(PortalManager const&){}; + PortalManager& operator=(PortalManager const&){}; }; #endif /* __PORTALMANAGER_H__ */ Deleted: geomcore/trunk/include/RouteMsgJob.h =================================================================== --- geomcore/trunk/include/RouteMsgJob.h 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/include/RouteMsgJob.h 2011-05-03 19:55:20 UTC (rev 44548) @@ -1,59 +0,0 @@ -/* R O U T E M S G J O B . H - * BRL-CAD - * - * Copyright (c) 2011 United States Government as represented by - * the U.S. Army Research Laboratory. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this file; see the file named COPYING for more - * information. - */ -/** @file RouteMsgJob.h - * - * Brief description - * - */ - -#ifndef __ROUTEMSGJOB_H__ -#define __ROUTEMSGJOB_H__ - -#include "NetMsg.h" -#include "AbstractJob.h" - -class RouteMsgJob : public AbstractJob -{ -public: - RouteMsgJob(NetMsg* msg); - virtual ~RouteMsgJob(); - -protected: - JobResult _doJob(); - -private: - NetMsg* msg; - - /* Disable copy cstr and =operator */ - RouteMsgJob(RouteMsgJob const&){}; - RouteMsgJob& operator=(RouteMsgJob const&){}; -}; - -#endif /* __ROUTEMSGJOB_H__ */ - -/* - * Local Variables: - * tab-width: 8 - * mode: C - * indent-tabs-mode: t - * c-file-style: "stroustrup" - * End: - * ex: shiftwidth=4 tabstop=8 - */ Modified: geomcore/trunk/src/GS/CMakeLists.txt =================================================================== --- geomcore/trunk/src/GS/CMakeLists.txt 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/GS/CMakeLists.txt 2011-05-03 19:55:20 UTC (rev 44548) @@ -103,13 +103,13 @@ INSTALL(FILES ${geomserv_inst_HDRS} DESTINATION include) add_executable(geomserv geomserv.cxx) -target_link_libraries(geomserv libgcutil libNet libgeomserv) +target_link_libraries(geomserv libgcutil libNet libgeomserv ${BRLCAD_PKG_LIBRARY}) install(TARGETS geomserv DESTINATION bin) file(COPY geomserv.config DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) add_executable(geomclient geomclient.cxx) -target_link_libraries(geomclient libgcutil libNet libgeomserv) +target_link_libraries(geomclient libgcutil libNet libgeomserv ${BRLCAD_PKG_LIBRARY}) install(TARGETS geomclient DESTINATION bin) file(COPY geomclient.config DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) Modified: geomcore/trunk/src/GS/DataManager.cxx =================================================================== --- geomcore/trunk/src/GS/DataManager.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/GS/DataManager.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -139,23 +139,40 @@ ++cnt; } - std::cout << "\ntotal: " << cnt << std::endl; +// std::cout << "\ntotal: " << cnt << std::endl; /* Send manifest */ GeometryManifestMsg man(originalMsg, items); + + ByteBuffer* temp = man.serialize(); + std::cout << "Manifest byte size: " << temp->position() << "\n"; + origin->send(&man); +/* + std::cout << "Pausing for 60 seconds... \n"; + usleep(1000*1000*50); + for (int i = 10; i<1;--i) { + std::cout << i << "\n"; + usleep(1000*1000); + } +*/ + // usleep(1000*10); /* Send chunks */ - int cnt2 = 0; + bool success = false; + int cnt2 = 0; for (std::list<GeometryChunkMsg*>::iterator chunkIter = msgs.begin(); chunkIter != msgs.end(); ++chunkIter) { chunk = *chunkIter; - std::cout << "Sending: " << cnt2 << std::endl; - usleep(1000 * 1000); - origin->send(chunk); +// std::cout << "Sending: " << cnt2 << std::endl; +// usleep(1000*10); + if (origin->send(chunk) < 1){ + log->logERROR("DataManager","Failed to send CHUNK. Socket closed?"); + break; + } cnt2++; } - std::cout << "\ntotal2: " << cnt2 << std::endl; +// std::cout << "\ntotal2: " << cnt2 << std::endl; return; } Modified: geomcore/trunk/src/GS/GSClient.cxx =================================================================== --- geomcore/trunk/src/GS/GSClient.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/GS/GSClient.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -39,7 +39,7 @@ this->portMan = new PortalManager(localNodeName); this->portMan->start(); - usleep(100000); + usleep(100 * 1000); this->registerMsgRoutes(); } @@ -145,21 +145,20 @@ GeometryManifestMsg* man = (GeometryManifestMsg*)msg; std::list<std::string>* items = man->getItemData(); std::string str; - std::stringstream ss; - int count = man->getNumOfItems(); - ss << "Items(" << count << "): "; + std::cout << "GSClient\tGot manifest of " << count << " items.\n"; - /* build manifest & Chunks to send*/ + // build manifest & Chunks to send for (std::list<std::string>::iterator it = items->begin(); it != items->end(); it++) { str = *it; - ss << "'" << str << ", "; + std::cout << " '" << str << ",\n"; } - log->logINFO("GSClient", ss.str()); + std::cout << std::endl; + return false; } case GEOMETRYCHUNK: @@ -180,9 +179,8 @@ } std::string name((char*)raw.name.ext_buf); - - log->logINFO("GSClient", "Got a Chunk named: " + name); - return false; + log->logINFO("GSClient", "Got a Chunk named: " + name); + return false; } } return false; Modified: geomcore/trunk/src/libJob/AbstractJob.cxx =================================================================== --- geomcore/trunk/src/libJob/AbstractJob.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libJob/AbstractJob.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -29,6 +29,7 @@ AbstractJob::AbstractJob() { this->status = JOB_NOTSTARTED; + this->jobID = GSUuid::createUuid(); } AbstractJob::~AbstractJob() @@ -47,7 +48,7 @@ return this->status; } -GSUuid AbstractJob::getJobId() +GSUuid* AbstractJob::getJobId() { return this->jobID; } Modified: geomcore/trunk/src/libNet/CMakeLists.txt =================================================================== --- geomcore/trunk/src/libNet/CMakeLists.txt 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/CMakeLists.txt 2011-05-03 19:55:20 UTC (rev 44548) @@ -34,6 +34,7 @@ PortalManager.cxx NetMsgFactory.cxx NetMsgRouter.cxx + MakeAndRouteMsgJob.cxx netMsg/NetMsg.cxx netMsg/TypeOnlyMsg.cxx netMsg/GenericOneStringMsg.cxx @@ -53,18 +54,17 @@ netMsg/GeometryChunkMsg.cxx netMsg/PingMsg.cxx netMsg/PongMsg.cxx - RouteMsgJob.cxx ) IF(BUILD_SHARED_LIBS) add_library(libNet SHARED ${libNet_SRCS}) - target_link_libraries(libNet libJob libgcutil libEvent libPkgCpp ${BRLCAD_RT_LIBRARY} ${LIBGE_GE_LIBRARY}) + target_link_libraries(libNet libJob libgcutil libEvent ${BRLCAD_RT_LIBRARY} ${LIBGE_GE_LIBRARY}) set_target_properties(libNet PROPERTIES PREFIX "") install(TARGETS libNet DESTINATION lib) ENDIF(BUILD_SHARED_LIBS) IF(BUILD_STATIC_LIBS) add_library(libNet-static STATIC ${libNet_SRCS}) - target_link_libraries(libNet-static libJob libgcutil libEvent libPkgCpp ${BRLCAD_RT_LIBRARY} ${LIBGE_GE_LIBRARY}) + target_link_libraries(libNet-static libJob libgcutil libEvent ${BRLCAD_RT_LIBRARY} ${LIBGE_GE_LIBRARY}) IF(NOT WIN32) set_target_properties(libNet-static PROPERTIES PREFIX "") set_target_properties(libNet-static PROPERTIES OUTPUT_NAME "libNet") @@ -82,6 +82,7 @@ INetMsgHandler.h NetMsgFactory.h NetMsgRouter.h + MakeAndRouteMsgJob.h NetMsg.h TypeOnlyMsg.h RemoteNodenameSetMsg.h @@ -102,6 +103,5 @@ GeometryChunkMsg.h PingMsg.h PongMsg.h - RouteMsgJob.h ) INSTALL(FILES ${libNet_inst_HDRS} DESTINATION include) Modified: geomcore/trunk/src/libNet/MakeAndRouteMsgJob.cxx =================================================================== --- geomcore/trunk/src/libNet/MakeAndRouteMsgJob.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/MakeAndRouteMsgJob.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -34,7 +34,7 @@ JobResult MakeAndRouteMsgJob::_doJob() { - Portal::tryToBuildNetMsgs(this->p); + this->p->tryToBuildNetMsgs(); } Modified: geomcore/trunk/src/libNet/NetMsgFactory.cxx =================================================================== --- geomcore/trunk/src/libNet/NetMsgFactory.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/NetMsgFactory.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -68,74 +68,98 @@ return NULL; } + NetMsg* msg = NULL; + /* Peek at type */ int start = bb->position(); uint16_t msgType = bb->get16bit(); bb->setPosition(start); +// std::cout <<"Attempting to deserialize type: " << msgType << "\n"; + /* TODO Replace this with a map for registration scheme */ switch (msgType) { case TEST_GENERIC_4BYTE_MSG: - return new GenericFourBytesMsg(bb, origin); + msg = new GenericFourBytesMsg(bb, origin); + break; case TEST_GENERIC_2BYTE_MSG: - return new GenericTwoBytesMsg(bb, origin); + msg = new GenericTwoBytesMsg(bb, origin); + break; case TEST_GENERIC_1BYTE_MSG: - return new GenericOneByteMsg(bb, origin); + msg = new GenericOneByteMsg(bb, origin); + break; case TEST_GENERIC_MULTIBYTE_MSG: - return new GenericMultiByteMsg(bb, origin); + msg = new GenericMultiByteMsg(bb, origin); + break; case TEST_GENERIC_1STRING_MSG: - return new GenericOneStringMsg(bb, origin); + msg = new GenericOneStringMsg(bb, origin); + break; case RUALIVE: - return new TypeOnlyMsg(bb, origin); + msg = new TypeOnlyMsg(bb, origin); + break; case IMALIVE: - return new TypeOnlyMsg(bb, origin); + msg = new TypeOnlyMsg(bb, origin); + break; case FAILURE: - return new GenericOneByteMsg(bb, origin); + msg = new GenericOneByteMsg(bb, origin); + break; case SUCCESS: - return new GenericOneByteMsg(bb, origin); + msg = new GenericOneByteMsg(bb, origin); + break; case GS_REMOTE_NODENAME_SET: - return new GenericOneStringMsg(bb, origin); + msg = new GenericOneStringMsg(bb, origin); + break; case DISCONNECTREQ: - return new TypeOnlyMsg(bb, origin); + msg = new TypeOnlyMsg(bb, origin); + break; case NEWNODEONNET: - return new GenericOneStringMsg(bb, origin); + msg = new GenericOneStringMsg(bb, origin); + break; /* case FULL_NODE_LISTREQ: */ - /* return new NetMsg(bb, origin); */ + /* msg = new NetMsg(bb, origin); */ /* case FULL_NODE_LIST: */ - /* return new NetMsg(bb, origin); */ + /* msg = new NetMsg(bb, origin); */ + /* break; */ case NEWSESSIONREQ: - return new NewSessionReqMsg(bb, origin); + msg = new NewSessionReqMsg(bb, origin); + break; case SESSIONINFO: - return new SessionInfoMsg(bb, origin); + msg = new SessionInfoMsg(bb, origin); + break; case GEOMETRYREQ: - return new GeometryReqMsg(bb, origin); + msg = new GeometryReqMsg(bb, origin); + break; case GEOMETRYMANIFEST: - return new GeometryManifestMsg(bb, origin); + msg = new GeometryManifestMsg(bb, origin); + break; case GEOMETRYCHUNK: - return new GeometryChunkMsg(bb, origin); + msg = new GeometryChunkMsg(bb, origin); + break; case PING: - return new PingMsg(bb, origin); + msg = new PingMsg(bb, origin); + break; case PONG: - return new PongMsg(bb, origin); + msg = new PongMsg(bb, origin); + break; /* Admin commands */ case CMD_SHUTDOWN: - return new TypeOnlyMsg(bb, origin); + msg = new TypeOnlyMsg(bb, origin); + break; - default: std::stringstream ss; ss << "Unknown Msgtype: "; @@ -144,6 +168,10 @@ return NULL; } + + +// std::cout << "Finished deserializing.... \n"; + return msg; } /* Modified: geomcore/trunk/src/libNet/Portal.cxx =================================================================== --- geomcore/trunk/src/libNet/Portal.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/Portal.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -23,6 +23,8 @@ * */ +#include <unistd.h> +#include <errno.h> #include "Portal.h" #include "PortalManager.h" #include "Logger.h" @@ -32,39 +34,120 @@ #include "NetMsgRouter.h" #include "RemoteNodenameSetMsg.h" #include "TypeOnlyMsg.h" -#include "RouteMsgJob.h" +#include "JobManager.h" +#include "MakeAndRouteMsgJob.h" -Portal::Portal(PortalManager* pm, PkgTcpClient* client, struct pkg_switch* table): -pm(pm), pkgClient(client), callbackTable(table), log(Logger::getInstance()), handshakeComplete(false) +Portal::Portal(PortalManager* pm, int socket): +pm(pm), log(Logger::getInstance()), handshakeComplete(false), +socket(socket) { GSUuid* uuid = GSUuid::createUuid(); std::string str = uuid->toString(); delete uuid; this->remoteNodeName.assign("NotSetYet-" + str); - /* set the struct's userdata */ - this->callbackTable[0].pks_user_data = this; + /* set an 256K initial size. */ + this->recvBuffer = ByteBuffer::allocate((1024 * 256)); + this->builder = new MakeAndRouteMsgJob(this); } Portal::~Portal() { - delete callbackTable; + delete this->builder; } int Portal::send(NetMsg* msg) { ByteBuffer* bb = msg->serialize(); - int retval = this->pkgClient->send(PKG_MAGIC2, bb->array(), bb->position()); + int len = bb->position(); + int retval = 0; - delete bb; + int pos = 0; + int loopCnt = 0; + int totalSend = 0; + int loopToSend = 0; + while (pos < len) { + if (pos + MAXCHUNKSIZE > len) + loopToSend = (len-pos); + else + loopToSend = MAXCHUNKSIZE; - /* Process any data moved by the underlying Socket buffer copy. */ - retval = this->pkgClient->processData(); - if (retval < 0) { - this->log->logERROR("Portal", - "Unable to process packets? Weird. (1) "); - return retval; - }/* TODO do we need to check for ==0 ? */ +#ifdef HAVE_WINSOCK_H + retval = send(this->socket, bb->array() + pos, loopToSend); +#else + retval = write(this->socket, bb->array() + pos, loopToSend); +#endif + if (retval == -1) + //error occurred + break; + + if (retval == 0) { + usleep(10); + continue; + } + + totalSend += retval; + ++loopCnt; + pos += retval; + + usleep(10); + + +/* + std::cout << "(looped: "<< loopCnt <<") Wanted to send: " << loopToSend ; + std::cout << ", actually sent: " << retval; + std::cout << ", Total sent: " << totalSend ; + std::cout << ", Total TO BE sent: " << len << "\n"; +*/ + + + + /* std::ostringstream ss; + ss << "\tStart bytes: "; + char* p = bb->array() + pos; + unsigned int b; + for (int i = 1; i<=(32);i++) + { + b = (unsigned int)*p; + if (b < 10) + ss << std::hex << '0' << (unsigned int)*p; + else + ss << std::hex << (unsigned int)*p; + if (i % 4 == 0) + ss << " "; + p++; + } + ss << "\n"; + + ss << "\tEnd bytes: "; + p = bb->array() + pos + retval - 32; + for (int i = 1; i<=(32);i++) + { + b = (unsigned int)*p; + if (b < 10) + ss << std::hex << '0' << (unsigned int)*p; + else + ss << std::hex << (unsigned int)*p; + if (i % 4 == 0) + ss << " "; + p++; + } + ss << "\n"; + + std::cout << ss.str();*/ + + } + if (retval == -1){ + std::cout << "Incomplete send: " << totalSend << " bytes.\n"; + std::cout << + "Seemed to have trouble writing to the socket: " + << errno << " ("<< strerror( errno ) << ")\n"; + + } +// else +// std::cout << "Finished send: " << totalSend << " bytes.\n"; + + delete bb; return retval; } int @@ -94,45 +177,94 @@ } int -Portal::flush() { - return this->pkgClient->flush(); -} +Portal::pullFromSock() { + int lastRead = 0; + int tryToRead = MAXCHUNKSIZE; + int remaining = 0; + int pos = 0; + int cap = 0; + int totalRead = 0; -int -Portal::read() { - int retval = 0; + /* Transfer data to portal's RecvBuffer */ + /* Assume that recvBuf->position is at the place where we need to start writing. */ + GSMutexLocker locker(&this->recvBufLock); + ByteBuffer* bb = this->recvBuffer; - /* recv first */ - retval = this->pkgClient->processData(); - if (retval < 0) { - this->log->logERROR("Portal", - "Unable to process packets? Weird. (1) "); - return retval; - }/* TODO do we need to check for ==0 ? */ + do { + if (bb->remaining() < 4){ + std::cout << "b4: " << bb->capacity() << " " << bb->limit() << "\n"; + /* Force a resize */ + bb->put64bit(0); + bb->put64bit(0); + bb->put64bit(0); + bb->setPosition(bb->position() - 24); + std::cout << "after: " << bb->capacity() << " " << bb->limit() << "\n"; + } - retval = this->pkgClient->pullDataFromSocket(); - if (retval < 0) { - this->log->logERROR("Portal", - "Seemed to have trouble pulling the data from the socket."); - return retval; + /* set limit to capacity for safety */ + cap = bb->capacity(); + bb->setLimit(cap); + pos = bb->position(); - } else if (retval == 0) { - this->log->logERROR("Portal", "Client closed the connection."); - return retval; - } + /* Check for enough space in BB */ + remaining = cap - pos; - retval = this->pkgClient->processData(); - if (retval < 0) { - this->log->logERROR("Portal", "Unable to process packets? Weird. (2)"); - return retval; - }/* TODO do we need to check for ==0 ? */ - return 1; + if (remaining < MAXCHUNKSIZE) + tryToRead = remaining - 1; + else + tryToRead = MAXCHUNKSIZE; + +// std::cout << " pos: " << pos ; +// std::cout << " cap: " << cap ; +// std::cout << " remaining: " << remaining ; +// std::cout << " tryToRead: " << tryToRead ; +// std::cout << " pos + tryToRead: " << (pos + tryToRead) << "\n" ; + + +#ifdef HAVE_WINSOCK_H + lastRead = recv(this->socket, bb->array() + pos, tryToRead); +#else + lastRead = read(this->socket, bb->array() + pos, tryToRead); +#endif + + if (lastRead < 0){ + this->log->logERROR("Portal", + "Seemed to have trouble pulling the data from the socket."); + std::cout << + "Seemed to have trouble pulling the data from the socket: " + << errno << " ("<< strerror( errno ) << ")\n"; + + + } else { + pos += lastRead; + totalRead += lastRead; + bb->setPosition(pos); + } +// std::cout << "read() returned: " << lastRead << "\n"; + + /* Break out if read comes back less than MAXED */ + if (lastRead < MAXCHUNKSIZE) + break; + + } while (lastRead > 0); + + /* Clamp the return val */ + int retVal = 0; + if (lastRead > 0) + retVal = 1; + if (lastRead < 0) + retVal = -1; + +// std::cout << "pullFromSock() loop exit. totalRead: " << totalRead; +// std::cout << " retVal: " << retVal << "\n"; + + return retVal; } std::string Portal::getRemoteNodeName() { - return this->remoteNodeName + ""; + return this->remoteNodeName + ""; } bool @@ -164,77 +296,107 @@ } void -Portal::callbackSpringboard(struct pkg_conn* conn, char* buf) { - Logger* log = Logger::getInstance(); +Portal::disconnect() +{ + this->pm->disconnect(this); +} - /* Check to see if we got a good Buffer and Portal Object */ - if (buf == 0) { - log->logERROR("Portal", "pkg callback returned a NULL buffer!"); - /* bu_bomb("pkg callback returned a NULL buffer!\n"); */ - return; - } +void +Portal::sendTypeOnlyMessage(uint32_t type, NetMsg* originalMsg) +{ + TypeOnlyMsg* tom = NULL; - int len = conn->pkc_inend - sizeof(pkg_header); + if (originalMsg == NULL) + tom = new TypeOnlyMsg(type); + else + tom = new TypeOnlyMsg(type, originalMsg); - if(len < 1) - return; + this->send(tom); + return; +} - ByteBuffer* bb = ByteBuffer::allocate(len); - bb->put(buf, len); - - if (conn->pkc_user_data == 0) { - log->logERROR("Portal", "pkg callback returned a NULL user_data pointer!"); +void +Portal::tryBuild() +{ + if (this->builder->getStatus() == JOB_RUNNING) return; - } - Portal* p = (Portal*) conn->pkc_user_data; - if (p == 0) { - log->logERROR("Portal", "WARNING! NULL Portal."); - } + JobManager::getInstance()->submitJob(this->builder); +} - /* Build a NetMsg */ - NetMsg* msg = NetMsgFactory::getInstance()->deserializeNetMsg(bb, p); - if (msg == NULL) { - log->logERROR("Portal", "WARNING! NetMsg failed to deserialize properly.\n"); - return; - } +void +Portal::tryToBuildNetMsgs() { + GSMutexLocker locker(&this->recvBufLock); + ByteBuffer* bb = this->recvBuffer; +// std::cout << "Portal's buffer position after data addition: " << bb->position() << "\n"; - delete bb; +// std::cout << "\nByteDump: " << bb->toHexString() << "\n\n"; - /* Route */ + NetMsg* msg = NULL; + int start = 0; + int endOfData = bb->position(); + bb->flip(); + while (msg == NULL) + { + if (bb->remaining() == 0) { +// std::cout << "No data left\n"; + break; + } - /* give the Portal first dibs on the netmsg */ - if (p->handleNetMsg(msg)) { - return; - } + start = bb->position(); - /* Fire off a Job. This keeps the selector loop from */ - /* delivering all the Msg copies personally.*/ - RouteMsgJob* job = new RouteMsgJob(msg); - job->submit(); -} + /* read GS header */ + uint16_t msgType = bb->get16bit(); + uint32_t msgLen = bb->get32bit(); + bb->setPosition(bb->position() - 6); -void -Portal::disconnect() -{ - this->pm->disconnect(this); -} + if (bb->remaining() < msgLen) { +// std::cout << "Not enough data to build the next message: type=" << msgType; +// std::cout << " requiredLen: " << msgLen; +// std::cout << " have: " << bb->remaining(); +// std::cout << " currentPos: " << bb->position(); +// std::cout << " capacity: " << bb->capacity() << "\n"; + bb->setPosition(start); + bb->setLimit(endOfData); + break; + } -void -Portal::sendTypeOnlyMessage(uint32_t type, NetMsg* originalMsg) -{ - TypeOnlyMsg* tom = NULL; + /* Build a NetMsg */ +// std::cout << "starting build at position: " << bb->position() << "\n"; + NetMsg* msg = NetMsgFactory::getInstance()->deserializeNetMsg(bb, this); + if (msg == NULL) { + log->logERROR("Portal", + "WARNING! NetMsg failed to deserialize properly.\n"); +// std::cout << "build failed at position: " << bb->position() << "\n"; + bb->setPosition(start); + bb->setLimit(endOfData); + break; + } +// std::cout << "build succeeded at position: " << bb->position() << "\n"; - if (originalMsg == NULL) - tom = new TypeOnlyMsg(type); - else - tom = new TypeOnlyMsg(type, originalMsg); + /* Route */ + if (Portal::routeNetMsg(msg) == false) { + std::stringstream ss; + ss << "WARNING! Failed to find route for NetMsg type: " << msg->getMsgType() << "\n"; + log->logERROR("Portal", ss.str()); + } - this->send(tom); - return; + /* Set back to NULL for proper loop logic */ + msg == NULL; + } + /* Compact BB */ + bb->compact(); } +bool +Portal::routeNetMsg(NetMsg* msg) { + /* give the Portal first dibs on the netmsg */ + if (msg->getOrigin()->handleNetMsg(msg) == true) + return true; + return NetMsgRouter::getInstance()->routeMsg(msg); + } + /* * Local Variables: * mode: C Modified: geomcore/trunk/src/libNet/PortalManager.cxx =================================================================== --- geomcore/trunk/src/libNet/PortalManager.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/PortalManager.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -26,8 +26,9 @@ #include "Portal.h" #include "PortalManager.h" #include "NetMsgFactory.h" -#include "PkgTcpClient.h" #include "NetMsgTypes.h" +#include "JobManager.h" +#include "MakeAndRouteMsgJob.h" #include <string> #include <stdio.h> @@ -40,7 +41,6 @@ this->localNodeName.assign(localNodeName + "PortMan"); this->listenAddress.assign(address); this->listenPort = listenPort; - this->tcpServer = new PkgTcpServer(); this->fdPortalMap = new std::map<int, Portal*> (); this->portalsLock = new GSMutex(); this->log = Logger::getInstance(); @@ -51,231 +51,261 @@ Portal* PortalManager::connectToHost(std::string host, uint16_t port) { - struct pkg_switch* table = this->makeNewSwitchTable(); - PkgTcpClient* pkgc = (PkgTcpClient*) this->tcpServer->connectToHost(host, port, table); +// struct pkg_switch* table = new pkg_switch[1]; +// +// table[0].pks_type = 0; +// table[0].pks_handler = 0; +// table[0].pks_title = (char*) 0; +// table[0].pks_user_data = 0; - if (pkgc == NULL) { - return NULL; - } else { - Portal* p = this->makeNewPortal(pkgc, table); - return p; - } + std::stringstream ss; + ss << port; + std::string s_port = ss.str(); + + /* Use PKG to do all the cross platform stuff */ + pkg_conn* conn = pkg_open(host.c_str(), s_port.c_str(), + "tcp", NULL, NULL, NULL, NULL); + + if (conn == PKC_ERROR) { + bu_log("Connection to %s, port %d, failed.\n", host.c_str(), + port); + return NULL; + } + + Portal* p = this->makeNewPortal(conn->pkc_fd); + + /* Dunno if i can do this without killing the FD */ + free(conn); + + return p; } -void -PortalManager::_run() { - this->log->logINFO("PortalManager", "Running"); - struct timeval timeout; - fd_set readfds; - fd_set exceptionfds; - int listener = -1; +int +PortalManager::listen() +{ + std::stringstream ss; + ss << this->listenPort; + std::string s_port = ss.str(); + int fd = pkg_permserver_ip(this->listenAddress.c_str(), s_port.c_str(), "tcp", 0, 0); + return fd; +} - this->masterFDSLock.lock(); - FD_ZERO(&masterfds); - this->masterFDSLock.unlock(); +int +PortalManager::accept(int listener) +{ + /* Use PKG to do all the cross platform stuff */ + pkg_conn* conn = pkg_getclient(listener, NULL, NULL, 42); - FD_ZERO(&readfds); - FD_ZERO(&exceptionfds); + if (conn == NULL) { + return -1; + } else if (conn == PKC_ERROR) { + bu_log("Fatal error accepting client connection.\n"); + free(conn); + return -1; + } - if (this->listenPort != 0) { - listener = this->tcpServer->listen(this->listenPort, this->listenAddress); + int fd = conn->pkc_fd; + free(conn); + return fd; +} - if (listener < 0) { - this->log->logERROR("PortalManager", "Failed to listen"); - return; - } else { - char buf[BUFSIZ]; - std::string s; - snprintf(buf, BUFSIZ, "%s:%d FD:%d", this->listenAddress.c_str(), this->listenPort, listener); - s.assign(buf); - this->log->logINFO("PortalManager", s); - } +void +PortalManager::_run() { + this->log->logINFO("PortalManager", "Running"); + struct timeval timeout; + fd_set readfds; + fd_set exceptionfds; + int listener = -1; - this->masterFDSLock.lock(); - FD_SET(listener, &masterfds); - fdmax = listener; - this->masterFDSLock.unlock(); - } + this->masterFDSLock.lock(); + FD_ZERO(&masterfds); + this->masterFDSLock.unlock(); - bool isListener = false; - bool readyRead = false; - bool readyAccept = false; - bool readyException = false; + FD_ZERO(&readfds); + FD_ZERO(&exceptionfds); - while (this->getRunCmd()) { - /* Set values EVERY loop since select() on *nix modifies this. */ - timeout.tv_sec = 0; - timeout.tv_usec = 50 * 1000; + //TODO eventually make this listen on unlimited number of ports. + /* Setup listening on single port */ + if (this->listenPort != 0) { + listener = this->listen(); - this->masterFDSLock.lock(); - readfds = masterfds; - exceptionfds = masterfds; - this->masterFDSLock.unlock(); + if (listener < 0) { + this->log->logERROR("PortalManager", "Failed to listen"); + return; + } else { + std::stringstream ss; + ss << "Listening on " << this->listenAddress; + ss << ":" << (int)this->listenPort; + ss << " FD:" << (int)listener; + this->log->logINFO("PortalManager", ss.str()); + } - /* Shelect!! */ - int retVal = select(fdmax + 1, &readfds, NULL, &exceptionfds, &timeout); + this->masterFDSLock.lock(); + FD_SET(listener, &masterfds); + fdmax = listener; + this->masterFDSLock.unlock(); + } - if (retVal < 0) { - char buf[BUFSIZ]; - /* got a selector error */ - snprintf(buf, BUFSIZ, "Selector Error: %d", errno); - this->log->logERROR("PortalManager", buf); - break; - } + bool isListener = false; + bool readyRead = false; + bool readyAccept = false; + bool readyException = false; + int newFD = 0; - for (int i = 0; i <= fdmax; ++i) { - bool isaFD = FD_ISSET(i, &masterfds); + while (this->getRunCmd()) { + /* Set values EVERY loop since select() on *nix modifies this. */ + timeout.tv_sec = 0; + timeout.tv_usec = 50 * 1000; - /* Don't muck with an FD that isn't ours! */ - if (!isaFD) { - continue; - } + this->masterFDSLock.lock(); + readfds = masterfds; + exceptionfds = masterfds; + this->masterFDSLock.unlock(); - /* Simplify switching later with bools now */ - isListener = (i == listener); - readyRead = FD_ISSET(i, &readfds) && !isListener; - readyAccept = FD_ISSET(i, &readfds) && isListener; - readyException = FD_ISSET(i, &exceptionfds); + /* Shelect!! */ + int retVal = select(fdmax + 1, &readfds, NULL, &exceptionfds, &timeout); - /* If nothing to do, then continue; */ - if (!readyRead && !readyAccept && !readyException) { - continue; - } + if (retVal < 0) { + char buf[BUFSIZ]; + /* got a selector error */ + snprintf(buf, BUFSIZ, "Selector Error: %d", errno); + this->log->logERROR("PortalManager", buf); + break; + } - /* Handle exceptions */ - if (readyException) { - /* TODO handle exceptions */ - perror("Exception on FileDescriptor"); - } + for (int i = 0; i <= fdmax; ++i) { + bool isaFD = FD_ISSET(i, &masterfds); - Portal* p = NULL; - /* Accept new connections: */ - if (readyAccept) { - struct pkg_switch* table = this->makeNewSwitchTable(); + /* Don't muck with an FD that isn't ours! */ + if (!isaFD) { + continue; + } - PkgTcpClient* client = - (PkgTcpClient*) this->tcpServer->waitForClient(table, - 42); + /* Simplify switching later with bools now */ + isListener = (i == listener); + readyRead = FD_ISSET(i, &readfds) && !isListener; + readyAccept = FD_ISSET(i, &readfds) && isListener; + readyException = FD_ISSET(i, &exceptionfds); - if (client == 0) { - log->logERROR("PortalManager", - "Error on accepting new client."); - } else { - /* Handle new client here. */ - p = this->makeNewPortal(client, table); - } - } + /* If nothing to do, then continue; */ + if (!readyRead && !readyAccept && !readyException) { + continue; + } - /* the only thing we want to do on the listener loop is accept */ - if (isListener) { - continue; - } + /* Handle exceptions */ + if (readyException) { + /* TODO handle exceptions */ + perror("Exception on FileDescriptor"); + } - /* If we didnt get a portal from accepting, then get one from the map */ - if (p == 0 && (*this->fdPortalMap)[i]) { - this->portalsLock->lock(); - p = (*this->fdPortalMap)[i]; - this->portalsLock->unlock(); - } + Portal* p = NULL; + /* Accept new connections: */ + if (readyAccept) { + newFD = this->accept(listener); - /* Check, again, if we have a good portal. */ - if (p == 0) { - /* Deal with unmapped file Descriptor */ - char buf[BUFSIZ]; - snprintf(buf, BUFSIZ, "FD %d not associated with a Portal, dropping connection.", i); - std::string s(buf); - this->closeFD(i, s); - continue; - } + if (newFD < 1) { + log->logERROR("PortalManager", + "Error on accepting new client."); + } else { + /* Handle new client here. */ + p = this->makeNewPortal(newFD); + } + } - /* read */ - if (readyRead) { - int readResult = p->read(); + /* the only thing we want to do on the listener loop is accept */ + if (isListener) { + continue; + } - if (readResult == 0) { - this->closeFD(i, ""); - continue; - } else if (readResult < 0) { - this->closeFD(i, "Error on read, dropping connection."); - continue; - } - } - } /* end FOR */ - } /* end while */ - this->log->logINFO("PortalManager", "Shutdown"); -}/* end fn */ + /* If we didnt get a portal from accepting, then get one from the map */ + if (p == 0 && (*this->fdPortalMap)[i]) { + this->portalsLock->lock(); + p = (*this->fdPortalMap)[i]; + this->portalsLock->unlock(); + } -Portal* -PortalManager::makeNewPortal(PkgTcpClient* client, struct pkg_switch* table) { - Portal* p = new Portal(this, client, table); + /* Check, again, if we have a good portal. */ + if (p == 0) { + /* Deal with unmapped file Descriptor */ + char buf[BUFSIZ]; + snprintf(buf, BUFSIZ, "FD %d not associated with a Portal, dropping connection.", i); + std::string s(buf); + this->closeFD(i, s); + continue; + } - if (p == 0) { - return 0; - } + /* read */ + if (readyRead) { +// std::cout << "\nCalling pullFromSock()\n"; + int readResult = p->pullFromSock(); +// std::cout << "\nDone Calling pullFromSock(), readResult was: " << readResult << "\n"; - /* Obtain lock and then map this new portal */ - this->portalsLock->lock(); - int newFD = p->pkgClient->getFileDescriptor(); - this->fdPortalMap->insert(std::pair<int,Portal*>(newFD, p)); - this->portalsLock->unlock(); +// MakeAndRouteMsgJob* j = new MakeAndRouteMsgJob(p); +// JobManager::getInstance()->submitJob(j); - /* Check maxFD and update if needed. */ - if (newFD > fdmax) { - fdmax = newFD; - } + p->tryBuild(); - /* Add to masterFDS. */ - this->masterFDSLock.lock(); - FD_SET(newFD, &masterfds); - this->masterFDSLock.unlock(); + if (readResult == 0) { + this->closeFD(i, "Closing FD (read returned zero)"); + continue; + } else if (readResult < 0) { + this->closeFD(i, "Error on read, dropping connection(254)."); + continue; + } + } + } /* end FOR */ + } /* end while */ + this->log->logINFO("PortalManager", "Shutdown"); +}/* end fn */ - p->sendGSNodeName(); +Portal* +PortalManager::makeNewPortal(int socket) { + Portal* p = new Portal(this, socket); - return p; -} + /* Obtain lock and then map this new portal */ + this->portalsLock->lock(); + this->fdPortalMap->insert(std::pair<int,Portal*>(socket, p)); + this->portalsLock->unlock(); -struct pkg_switch* -PortalManager::makeNewSwitchTable() { - struct pkg_switch* table = new pkg_switch[2]; + /* Check maxFD and update if needed. */ + if (socket > fdmax) + fdmax = socket; - table[0].pks_type = PKG_MAGIC2; - table[0].pks_handler = &(Portal::callbackSpringboard); - table[0].pks_title = "SpringBoard"; - table[0].pks_user_data = 0; + /* Add to masterFDS. */ + this->masterFDSLock.lock(); + FD_SET(socket, &masterfds); + this->masterFDSLock.unlock(); - table[1].pks_type = 0; - table[1].pks_handler = 0; - table[1].pks_title = (char*) 0; - table[1].pks_user_data = 0; + /* Start handshake */ + p->sendGSNodeName(); - return table; + return p; } void -PortalManager::closeFD(int fd, std::string logComment) { - close(fd); +PortalManager::closeFD(int fd, std::string logComment) +{ + close(fd); - this->masterFDSLock.lock(); - if (FD_ISSET(fd, &this->masterfds)) { - FD_CLR(fd, &this->masterfds); - } - this->masterFDSLock.unlock(); + this->masterFDSLock.lock(); + if (FD_ISSET(fd, &this->masterfds)) { + FD_CLR(fd, &this->masterfds); + } + this->masterFDSLock.unlock(); - this->portalsLock->lock(); - this->fdPortalMap->erase(fd); - this->portalsLock->unlock(); + this->portalsLock->lock(); + this->fdPortalMap->erase(fd); + this->portalsLock->unlock(); - if (logComment.length() >0) { - this->log->logINFO("PortalManager", logComment); - } + if (logComment.length() > 0) + this->log->logINFO("PortalManager", logComment); } void PortalManager::disconnect(Portal* p) { - int fd = p->pkgClient->getFileDescriptor(); - this->closeFD(fd, "Disconnect requested."); + this->closeFD(p->socket, "Disconnect requested."); } bool Deleted: geomcore/trunk/src/libNet/RouteMsgJob.cxx =================================================================== --- geomcore/trunk/src/libNet/RouteMsgJob.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/RouteMsgJob.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -1,49 +0,0 @@ -/* R O U T E M S G J O B . C X X - * BRL-CAD - * - * Copyright (c) 2011 United States Government as represented by - * the U.S. Army Research Laboratory. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this file; see the file named COPYING for more - * information. - */ -/** @file RouteMsgJob.cxx - * - * Brief description - * - */ - -#include "RouteMsgJob.h" -#include "NetMsgRouter.h" - -RouteMsgJob::RouteMsgJob(NetMsg* msg) : - msg(msg) -{} - -RouteMsgJob::~RouteMsgJob() {} - -JobResult RouteMsgJob::_doJob() -{ - NetMsgRouter::getInstance()->routeMsg(this->msg); -} - - -/* - * Local Variables: - * mode: C - * tab-width: 8 - * indent-tabs-mode: t - * c-file-style: "stroustrup" - * End: - * ex: shiftwidth=4 tabstop=8 - */ Modified: geomcore/trunk/src/libNet/netMsg/GeometryManifestMsg.cxx =================================================================== --- geomcore/trunk/src/libNet/netMsg/GeometryManifestMsg.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/netMsg/GeometryManifestMsg.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -24,6 +24,7 @@ */ #include "GeometryManifestMsg.h" +#include <iostream> /* Normal Constructor */ GeometryManifestMsg::GeometryManifestMsg( @@ -49,11 +50,15 @@ std::string tstr; for (uint32_t i = 0; i < numOfItems; ++i) { +// if (i % 50 == 0) std::cout << i << std::endl; tstr = bb->getString(); if (tstr.size() == 0) continue; this->itemData->push_back(tstr); } + + std::cout << "\nGeomManifest CSTR exiting\n\n"; + } /* Destructor */ Modified: geomcore/trunk/src/libNet/netMsg/NetMsg.cxx =================================================================== --- geomcore/trunk/src/libNet/netMsg/NetMsg.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/libNet/netMsg/NetMsg.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -54,6 +54,7 @@ this->origin = origin; this->msgType = bb->get16bit(); //this isn't right... + this->msgLen = bb->get32bit(); s = bb->getString(); this->msgUUID = new GSUuid(s); @@ -82,19 +83,36 @@ void NetMsg::serialize(ByteBuffer* bb) { - /* Serialize Header */ - bb->put16bit(this->msgType); - bb->putString(this->msgUUID->toString()); - bb->put(this->hasReUUID); + int start = bb->position(); - if (this->hasReUUID) - bb->putString(this->reUUID->toString()); + /* Serialize Header */ + bb->put16bit(this->msgType); + int lenPosition = bb->position(); + bb->put32bit(0); + bb->putString(this->msgUUID->toString()); + bb->put(this->hasReUUID); - /* Call subclass serialize */ - if (!this->_serialize(bb)) { - std::cerr << "A serialization Error in NetMsg::serialize() occurred.\n"; - return; - } + if (this->hasReUUID) + bb->putString(this->reUUID->toString()); + + /* Call subclass serialize */ + if (!this->_serialize(bb)) { + std::cerr << "A serialization Error in NetMsg::serialize() occurred.\n"; + return; + } + + int stop = bb->position(); + this->msgLen = stop - start; + + if (this->msgLen < 1){ + std::cerr << "A serialization Error in NetMsg::serialize() occurred (len < 1).\n"; + return; + } + + /* Go back and fill in length */ + bb->setPosition(lenPosition); + bb->put32bit(this->msgLen); + bb->setPosition(stop); } /* @@ -106,6 +124,12 @@ return this->msgType; } +uint32_t +NetMsg::getMsgLen() const +{ + return this->msgLen; +} + GSUuid* NetMsg::getMsgUUID() const { Modified: geomcore/trunk/src/utility/ByteBuffer.cxx =================================================================== --- geomcore/trunk/src/utility/ByteBuffer.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/src/utility/ByteBuffer.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -296,8 +296,15 @@ void ByteBuffer::putString(std::string str) { - this->put32bit(str.length()); - this->put((char*)str.c_str(), str.length()); + int len = str.length(); + + if (len == 0) { + this->put32bit(0); + return; + } + + this->put32bit(len); + this->put((char*)str.c_str(), len); } std::string @@ -305,18 +312,59 @@ { uint32_t len = this->get32bit(); + if (len > 2048) { + std::ostringstream ss; + ss << "Warning: Massive string as position: "<< (this->position()-4) << " Data= \n"; + int startPos = this->position() - 4 - 64; + + char* p = this->array() + startPos; + unsigned int b; + for (int i = 1; i<=(64);i++) + { + b = (unsigned int)*p; + if (b < 10) + ss << std::hex << '0' << (unsigned int)*p << ' '; + else + ss << std::hex << (unsigned int)*p << ' '; + + if (i % 4 == 0) + ss << "\t"; + if (i % 32 == 0) + ss << "\n"; + p++; + } + ss << "\n\n"; + + for (int i = 1; i<=(128);i++) + { + b = (unsigned int)*p; + if (b < 10) + ss << std::hex << '0' << (unsigned int)*p << ' '; + else + ss << std::hex << (unsigned int)*p << ' '; + + if (i % 4 == 0) + ss << "\t"; + if (i % 32 == 0) + ss << "\n"; + p++; + } + + std::cout << "\n\n" << ss.str() << "\n\n"; + exit(0); + } + + char* ptr = this->array() + this->position(); this->setPosition(this->position() + len); std::string out = ""; out.append(ptr, len); - /* - std::cout << "pos: " << this->position(); + std::cout << "bb pos: " << this->position(); std::cout << "/" << this->capacity(); - std::cout << "len: " << len << " string: '"; + std::cout << " len: " << len << " string: '"; std::cout << out << "'\n"; */ - return out; } Modified: geomcore/trunk/tests/func/libJob/PrintToStdOutJob.cxx =================================================================== --- geomcore/trunk/tests/func/libJob/PrintToStdOutJob.cxx 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/tests/func/libJob/PrintToStdOutJob.cxx 2011-05-03 19:55:20 UTC (rev 44548) @@ -41,7 +41,7 @@ { usleep(2500); GSMutexLocker(this->streamLock); - std::cout << "JobID:" << this->jobID.toString() << " Text: "<< text; + std::cout << "JobID:" << this->jobID->toString() << " Text: "<< text; return JOB_COMPLETED_NO_ERRORS; } Modified: geomcore/trunk/tests/unit/CMakeLists.txt =================================================================== --- geomcore/trunk/tests/unit/CMakeLists.txt 2011-05-03 19:40:01 UTC (rev 44547) +++ geomcore/trunk/tests/unit/CMakeLists.txt 2011-05-03 19:55:20 UTC (rev 44548) @@ -36,6 +36,7 @@ SET( UnitTests_LIBS ${BRLCAD_BU_LIBRARY} + ${BRLCAD_PKG_LIBRARY} ${TCL_LIBRARIES} ${CPPUNIT_LIBRARY} libgcutil This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |