From: <arn...@us...> - 2007-12-30 21:41:50
|
Revision: 117 http://adchpp.svn.sourceforge.net/adchpp/?rev=117&view=rev Author: arnetheduck Date: 2007-12-30 13:41:46 -0800 (Sun, 30 Dec 2007) Log Message: ----------- Use ref-counted buffer to save some memory Modified Paths: -------------- adchpp/trunk/adchpp/AdcCommand.cpp adchpp/trunk/adchpp/AdcCommand.h adchpp/trunk/adchpp/Client.cpp adchpp/trunk/adchpp/Client.h adchpp/trunk/adchpp/ClientManager.cpp adchpp/trunk/adchpp/ClientManager.h adchpp/trunk/adchpp/ManagedSocket.cpp adchpp/trunk/adchpp/ManagedSocket.h adchpp/trunk/adchpp/Socket.cpp adchpp/trunk/adchpp/Socket.h adchpp/trunk/adchpp/SocketManager.cpp adchpp/trunk/adchpp/SocketManager.h adchpp/trunk/swig/adchpp.i adchpp/trunk/test/PyClient.py Added Paths: ----------- adchpp/trunk/adchpp/Buffer.h Modified: adchpp/trunk/adchpp/AdcCommand.cpp =================================================================== --- adchpp/trunk/adchpp/AdcCommand.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/AdcCommand.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -24,9 +24,9 @@ using namespace std; -AdcCommand::AdcCommand() : cmdInt(0), str(0), from(0), type(0) { } +AdcCommand::AdcCommand() : cmdInt(0), from(0), type(0) { } -AdcCommand::AdcCommand(Severity sev, Error err, const string& desc, char aType /* = TYPE_INFO */) : cmdInt(CMD_STA), str(&tmp), from(HUB_SID), type(aType) { +AdcCommand::AdcCommand(Severity sev, Error err, const string& desc, char aType /* = TYPE_INFO */) : cmdInt(CMD_STA), from(HUB_SID), type(aType) { addParam(Util::toString(sev * 100 + err)); addParam(desc); } @@ -44,28 +44,30 @@ } } -void AdcCommand::parse(const string& aLine) throw(ParseException) { - if(aLine.length() < 5) { +void AdcCommand::parse(const char* buf, size_t len) throw(ParseException) { + if(len < 5) { throw ParseException("Command too short"); } - type = aLine[0]; + type = buf[0]; if(type != TYPE_BROADCAST && type != TYPE_CLIENT && type != TYPE_DIRECT && type != TYPE_ECHO && type != TYPE_FEATURE && type != TYPE_INFO && type != TYPE_HUB && type != TYPE_UDP) { throw ParseException("Invalid type"); } - cmd[0] = aLine[1]; - cmd[1] = aLine[2]; - cmd[2] = aLine[3]; + cmd[0] = buf[1]; + cmd[1] = buf[2]; + cmd[2] = buf[3]; - if(aLine[4] != ' ') { + if(buf[4] != ' ') { throw ParseException("Missing space after command"); } - string::size_type len = aLine.length() - 1; // aLine contains trailing LF + // Skip trailing LF + len--; - const char* buf = aLine.c_str(); + parameters.reserve(8); + string cur; cur.reserve(64); @@ -158,10 +160,19 @@ } } -const string& AdcCommand::toString() const { - if(!str->empty()) - return *str; +const BufferPtr& AdcCommand::getBuffer() const { + if(!buffer) { + buffer = BufferPtr(new Buffer(toString())); + } + return buffer; +} +string AdcCommand::toString() const { + if(buffer) { + return string((char*)buffer->data(), buffer->size()); + } + string tmp; + tmp.reserve(128); tmp += type; @@ -206,7 +217,7 @@ for(string::size_type i = start; i < getParameters().size(); ++i) { if(toCode(name) == toCode(getParameters()[i].c_str())) { getParameters().erase(getParameters().begin() + i); - resetString(); + resetBuffer(); return true; } } Modified: adchpp/trunk/adchpp/AdcCommand.h =================================================================== --- adchpp/trunk/adchpp/AdcCommand.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/AdcCommand.h 2007-12-30 21:41:46 UTC (rev 117) @@ -22,6 +22,7 @@ #include "common.h" #include "Exception.h" #include "Util.h" +#include "Buffer.h" namespace adchpp { @@ -108,33 +109,37 @@ ADCHPP_DLL AdcCommand(); ADCHPP_DLL explicit AdcCommand(Severity sev, Error err, const std::string& desc, char aType = TYPE_INFO); - explicit AdcCommand(uint32_t cmd, char aType = TYPE_INFO, uint32_t aFrom = HUB_SID) : cmdInt(cmd), str(&tmp), from(aFrom), type(aType) { } - explicit AdcCommand(const std::string& aLine) throw(ParseException) : cmdInt(0), str(&aLine), type(0) { parse(aLine); } - AdcCommand(const AdcCommand& rhs) : parameters(rhs.parameters), cmdInt(rhs.cmdInt), str(&tmp), from(rhs.from), to(rhs.to), type(rhs.type) { } + explicit AdcCommand(uint32_t cmd, char aType = TYPE_INFO, uint32_t aFrom = HUB_SID) : cmdInt(cmd), from(aFrom), type(aType) { } + explicit AdcCommand(const std::string& aLine) throw(ParseException) : cmdInt(0), type(0) { parse(aLine); } + explicit AdcCommand(const BufferPtr& buffer_) throw(ParseException) : buffer(buffer_), cmdInt(0), type(0) { parse((const char*)buffer->data(), buffer->size()); } + AdcCommand(const AdcCommand& rhs) : parameters(rhs.parameters), cmdInt(rhs.cmdInt), from(rhs.from), to(rhs.to), type(rhs.type) { } - ADCHPP_DLL void parse(const std::string& aLine) throw(ParseException); + void parse(const std::string& str) throw(ParseException) { parse(str.data(), str.size()); } + ADCHPP_DLL void parse(const char* buf, size_t len) throw(ParseException); uint32_t getCommand() const { return cmdInt; } char getType() const { return type; } - + std::string getFourCC() const { std::string tmp(4, 0); tmp[0] = type; tmp[1] = cmd[0]; tmp[2] = cmd[1]; tmp[3] = cmd[2]; return tmp; } StringList& getParameters() { return parameters; } const StringList& getParameters() const { return parameters; } + ADCHPP_DLL std::string toString() const; - ADCHPP_DLL const std::string& toString() const; - void resetString() { tmp.clear(); str = &tmp; } - AdcCommand& addParam(const std::string& name, const std::string& value) { parameters.push_back(name); parameters.back() += value; return *this; } + AdcCommand& addParam(const std::string& param) { parameters.push_back(param); return *this; } + const std::string& getParam(size_t n) const { return getParameters().size() > n ? getParameters()[n] : Util::emptyString; } + void resetBuffer() { buffer = BufferPtr(); } + const std::string& getFeatures() const { return features; } /** Return a named parameter where the name is a two-letter code */ @@ -148,6 +153,8 @@ ADCHPP_DLL static void escape(const std::string& s, std::string& out); + ADCHPP_DLL const BufferPtr& getBuffer() const; + uint32_t getTo() const { return to; } void setTo(uint32_t aTo) { to = aTo; } uint32_t getFrom() const { return from; } @@ -161,14 +168,15 @@ StringList parameters; std::string features; + + mutable BufferPtr buffer; + union { char cmdChar[4]; uint8_t cmd[4]; uint32_t cmdInt; }; - const std::string* str; - mutable std::string tmp; - + uint32_t from; uint32_t to; char type; Added: adchpp/trunk/adchpp/Buffer.h =================================================================== --- adchpp/trunk/adchpp/Buffer.h (rev 0) +++ adchpp/trunk/adchpp/Buffer.h 2007-12-30 21:41:46 UTC (rev 117) @@ -0,0 +1,43 @@ +#ifndef BUFFER_H_ +#define BUFFER_H_ + +#include "FastAlloc.h" + +namespace adchpp { + +/** + * Reference-counted buffer + */ +class Buffer : public intrusive_ptr_base, public FastAlloc<Buffer> { +public: + Buffer(const std::string& str) : buf((uint8_t*)str.data(), (uint8_t*)str.data() + str.size()) { } + Buffer(const void* ptr, const size_t size) : buf((uint8_t*) ptr, ((uint8_t*)ptr)+size) { } + Buffer(const size_t size) : buf(size) { } + + operator const ByteVector&() const { return buf; } + operator ByteVector&() { return buf; } + + void resize(size_t new_size) { buf.resize(new_size); } + size_t size() const { return buf.size(); } + const uint8_t* data() const { return &buf[0]; } + uint8_t* data() { return &buf[0]; } + + /** Erase the first n bytes */ + void erase_first(size_t n) { + buf.erase(buf.begin(), buf.begin() + n); + } + + template<typename InputIterator> + void append(InputIterator start, InputIterator end) { + buf.insert(buf.end(), start, end); + } +private: + ByteVector buf; +}; + +typedef boost::intrusive_ptr<Buffer> BufferPtr; +typedef std::vector<BufferPtr> BufferList; + +} + +#endif /*BUFFER_H_*/ Modified: adchpp/trunk/adchpp/Client.cpp =================================================================== --- adchpp/trunk/adchpp/Client.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/Client.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -43,19 +43,19 @@ // Lightweight call forwarders, instead of tr1::bind struct Handler { Handler(void (Client::*f)(), Client* c_) : c(c_), f0(f) { } - Handler(void (Client::*f)(const ByteVector&), Client* c_) : c(c_), f1(f) { } + Handler(void (Client::*f)(const BufferPtr&), Client* c_) : c(c_), f1(f) { } void operator()() throw() { (c->*f0)(); } - void operator()(const ByteVector& bv) throw() { + void operator()(const BufferPtr& bv) throw() { (c->*f1)(bv); } Client* c; union { void (Client::*f0)(); - void (Client::*f1)(const ByteVector&); + void (Client::*f1)(const BufferPtr&); }; }; } @@ -90,43 +90,59 @@ return (i != psd.end()) ? i->second : 0; } -void Client::onData(const ByteVector& data) throw() { - dcdebug("In (%d): %.*s\n", data.size(), data.size(), &data[0]); - +void Client::onData(const BufferPtr& buf) throw() { + uint8_t* data = buf->data(); size_t done = 0; - size_t len = data.size(); + size_t len = buf->size(); while(!disconnecting && done < len) { if(dataBytes > 0) { size_t n = (size_t)min(dataBytes, (int64_t)(len - done)); - dataHandler(*this, &data[done], n); + dataHandler(*this, data + done, n); dataBytes -= n; done += n; } else { size_t j = done; while(j < len && data[j] != '\n') ++j; - + if(j == len) { - line.append((char*)&data[done], j - done); + if(!buffer) { + if(done == 0) { + buffer = buf; + } else { + buffer = BufferPtr(new Buffer(data + done, len - done)); + } + } else { + buffer->append(data + done, data + len); + } return; + } else if(!buffer) { + if(done == 0 && j == len-1) { + buffer = buf; + } else { + buffer = BufferPtr(new Buffer(data + done, j - done + 1)); + } + } else { + buffer->append(data + done, data + j + 1); } - line.append((char*)&data[done], j - done + 1); // include LF done = j + 1; + + size_t max_cmd_size = static_cast<size_t>(SETTING(MAX_COMMAND_SIZE)); - if(SETTING(MAX_COMMAND_SIZE) > 0 && line.size() > (size_t)SETTING(MAX_COMMAND_SIZE)) { + if(max_cmd_size > 0 && buffer->size() > max_cmd_size) { send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Command too long")); disconnect(Util::REASON_MAX_COMMAND_SIZE); return; } - if(line.size() == 1) { - line.clear(); + if(buffer->size() == 1) { + buffer = BufferPtr(); continue; } try { - AdcCommand cmd(line); + AdcCommand cmd(buffer); if(cmd.getType() == 'H') { cmd.setFrom(getSID()); @@ -136,9 +152,9 @@ } ClientManager::getInstance()->onReceive(*this, cmd); } catch(const ParseException&) { - ClientManager::getInstance()->onBadLine(*this, line); + ClientManager::getInstance()->onBadLine(*this, string((char*)buffer->data(), buffer->size())); } - line.clear(); + buffer = BufferPtr(); } } } @@ -157,7 +173,7 @@ info[code] = value; } changed[code] = value; - INF.clear(); + INF = BufferPtr(); } bool Client::getChangedFields(AdcCommand& cmd) const throw() { @@ -172,11 +188,11 @@ return !info.empty(); } -const string& Client::getINF() const throw() { - if(INF.empty()) { +const BufferPtr& Client::getINF() const throw() { + if(!INF) { AdcCommand cmd(AdcCommand::CMD_INF, AdcCommand::TYPE_BROADCAST, getSID()); getAllFields(cmd); - INF = cmd.toString(); + INF = cmd.getBuffer(); } return INF; } @@ -243,7 +259,6 @@ void Client::disconnect(Util::Reason reason) throw() { if(socket && !disconnecting) { disconnecting = true; - line.clear(); socket->disconnect(reason); } } Modified: adchpp/trunk/adchpp/Client.h =================================================================== --- adchpp/trunk/adchpp/Client.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/Client.h 2007-12-30 21:41:46 UTC (rev 117) @@ -68,16 +68,14 @@ const StringList& getSupportList() const throw() { return supportList; } bool supports(const std::string& feat) const throw() { return find(supportList.begin(), supportList.end(), feat) != supportList.end(); } - void send(const char* command, size_t len) throw() { - dcassert(socket != NULL); - socket->write(command, len); - } - void send(const AdcCommand& cmd) throw() { send(cmd.toString()); } + void send(const char* command, size_t len) throw() { send(BufferPtr(new Buffer(command, len))); } + + void send(const AdcCommand& cmd) throw() { send(cmd.getBuffer()); } void send(const std::string& command) throw() { send(command.c_str(), command.length()); } - void send(const char* command) throw() { socket->write(command, strlen(command)); } - - void fastSend(const std::string& command, bool lowPrio = false) throw() { - socket->fastWrite(command.c_str(), command.length(), lowPrio); + void send(const BufferPtr& command) throw() { socket->write(command); } + + void fastSend(const BufferPtr& command, bool lowPrio = false) throw() { + socket->fastWrite(command, lowPrio); } size_t getQueuedBytes() throw() { return socket->getQueuedBytes(); } @@ -97,7 +95,7 @@ /** Add any flags that have been updated to the AdcCommand (type etc is not set) */ ADCHPP_DLL bool getChangedFields(AdcCommand& cmd) const throw(); ADCHPP_DLL bool getAllFields(AdcCommand& cmd) const throw(); - ADCHPP_DLL const std::string& getINF() const throw(); + ADCHPP_DLL const BufferPtr& getINF() const throw(); void resetChanged() { changed.clear(); } @@ -169,20 +167,21 @@ bool disconnecting; PSDList psd; - std::string line; + BufferPtr buffer; ManagedSocketPtr socket; int64_t dataBytes; time_t floodTimer; /** Latest INF cached */ - mutable std::string INF; + mutable BufferPtr INF; DataFunction dataHandler; void setSocket(const ManagedSocketPtr& aSocket) throw(); void onConnected() throw(); - void onData(const ByteVector&) throw(); + void onData(const BufferPtr&) throw(); void onFailed() throw(); + }; } Modified: adchpp/trunk/adchpp/ClientManager.cpp =================================================================== --- adchpp/trunk/adchpp/ClientManager.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/ClientManager.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -49,7 +49,7 @@ } void ClientManager::send(const AdcCommand& cmd, bool lowPrio /* = false */) throw() { - const string& txt = cmd.toString(); + const BufferPtr& buf = cmd.getBuffer(); bool all = false; switch (cmd.getType()) { @@ -61,21 +61,20 @@ int override = 0; signalSend_(*i->second, cmd, override); if(!(override & DONT_SEND)) { - i->second->fastSend(txt, lowPrio); + i->second->fastSend(buf, lowPrio); } } } - SocketManager::getInstance()->addAllWriters(); } break; case AdcCommand::TYPE_DIRECT: // Fallthrough case AdcCommand::TYPE_ECHO: { ClientIter i = clients.find(cmd.getTo()); if (i != clients.end()) { - i->second->send(txt); + i->second->send(buf); if (COMPATIBILITY || cmd.getType() == AdcCommand::TYPE_ECHO) { i = clients.find(cmd.getFrom()); if (i != clients.end()) { - i->second->send(txt); + i->second->send(buf); } } } @@ -83,14 +82,11 @@ } } -void ClientManager::sendToAll(const string& cmd) throw() { - { - FastMutex::Lock l(ManagedSocket::getWriteMutex()); - for (ClientIter i = clients.begin(); i != clients.end(); ++i) { - i->second->fastSend(cmd); - } +void ClientManager::sendToAll(const BufferPtr& buf) throw() { + FastMutex::Lock l(ManagedSocket::getWriteMutex()); + for (ClientIter i = clients.begin(); i != clients.end(); ++i) { + i->second->fastSend(buf); } - SocketManager::getInstance()->addAllWriters(); } size_t ClientManager::getQueuedBytes() throw() { @@ -106,7 +102,7 @@ void ClientManager::sendTo(const AdcCommand& cmd, const uint32_t& to) throw() { ClientIter i = clients.find(to); if (i != clients.end()) { - i->second->send(cmd.toString()); + i->second->send(cmd.getBuffer()); } } @@ -115,7 +111,7 @@ AdcCommand s(AdcCommand::CMD_SUP); for (StringIter i = supports.begin(); i != supports.end(); ++i) s.addParam("AD" + *i); - strings.sup = s.toString(); + strings.sup = s.getBuffer(); strings.inf = AdcCommand(AdcCommand::CMD_INF) .addParam("NI", SETTING(HUB_NAME)) @@ -124,7 +120,7 @@ .addParam("VE", versionString) .addParam("CT5") .addParam("HU1") // ADC <=0.13 - .toString(); + .getBuffer(); } bool ClientManager::checkFlooding(Client& c, const AdcCommand& cmd) throw() { @@ -202,7 +198,7 @@ } void ClientManager::badState(Client& c, const AdcCommand& cmd) throw() { - c.send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_BAD_STATE, "Invalid state for command").addParam("FC", cmd.toString().substr(0, 4))); + c.send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_BAD_STATE, "Invalid state for command").addParam("FC", cmd.getFourCC())); c.disconnect(Util::REASON_BAD_STATE); } @@ -323,7 +319,7 @@ } else if (j->compare(2, j->size()-2, "0.0.0.0") == 0) { c.setField("I4", c.getIp()); *j = "I4" + c.getIp(); - cmd.resetString(); + cmd.resetBuffer(); } else if (j->size()-2 != c.getIp().size() || j->compare(2, j->size()-2, c.getIp()) != 0) { c.send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_BAD_IP, "Your ip is " + c.getIp()).addParam("IP", c.getIp())); c.disconnect(Util::REASON_INVALID_IP); @@ -453,13 +449,11 @@ dcassert(c.getState() == Client::STATE_IDENTIFY || c.getState() == Client::STATE_VERIFY); dcdebug("%s entering NORMAL\n", AdcCommand::fromSID(c.getSID()).c_str()); - if (sendData) { - string str; - for (ClientIter i = clients.begin(); i != clients.end(); ++i) { - str += i->second->getINF(); + if(sendData) { + for(ClientIter i = clients.begin(); i != clients.end(); ++i) { + c.send(i->second->getINF()); } - c.send(str); - if (sendOwnInf) { + if(sendOwnInf) { sendToAll(c.getINF()); c.send(c.getINF()); } Modified: adchpp/trunk/adchpp/ClientManager.h =================================================================== --- adchpp/trunk/adchpp/ClientManager.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/ClientManager.h 2007-12-30 21:41:46 UTC (rev 117) @@ -69,9 +69,10 @@ /** Send a command to the clients according to its type */ ADCHPP_DLL void send(const AdcCommand& cmd, bool lowPrio = false) throw(); + void sendToAll(const AdcCommand& cmd) throw() { sendToAll(cmd.getBuffer()); } /** Send command to all regardless of type */ - void sendToAll(const AdcCommand& cmd) throw() { sendToAll(cmd.toString()); } - ADCHPP_DLL void sendToAll(const std::string& cmd) throw(); + void sendToAll(const std::string& cmd) throw() { sendToAll(BufferPtr(new Buffer(cmd))); } + ADCHPP_DLL void sendToAll(const BufferPtr& buffer) throw(); /** Send command to a single client regardless of type */ ADCHPP_DLL void sendTo(const AdcCommand& cmd, const uint32_t& to) throw(); @@ -188,8 +189,8 @@ // Strings used in various places along the pipeline...rebuilt in updateCache()... struct Strings { - std::string sup; - std::string inf; + BufferPtr sup; + BufferPtr inf; } strings; friend class Singleton<ClientManager>; Modified: adchpp/trunk/adchpp/ManagedSocket.cpp =================================================================== --- adchpp/trunk/adchpp/ManagedSocket.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/ManagedSocket.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -31,7 +31,7 @@ FastMutex ManagedSocket::writeMutex; -ManagedSocket::ManagedSocket() throw() : outBuf(0), overFlow(0), disc(0) +ManagedSocket::ManagedSocket() throw() : overFlow(0), disc(0) #ifdef _WIN32 , writeBuf(0) #else @@ -42,6 +42,7 @@ ManagedSocket::~ManagedSocket() throw() { dcdebug("ManagedSocket deleted\n"); +#if 0 if(outBuf) { dcdebug("Left (%d): %.*s\n", outBuf->size(), outBuf->size(), &(*outBuf)[0]); Util::freeBuf = outBuf; @@ -53,105 +54,118 @@ Util::freeBuf = writeBuf; } #endif +#endif } -void ManagedSocket::write(const char* buf, size_t len) throw() { - bool add = false; - { - FastMutex::Lock l(writeMutex); - add = fastWrite(buf, len); +void ManagedSocket::write(const BufferPtr& buf) throw() { + FastMutex::Lock l(writeMutex); + fastWrite(buf); +} + +static size_t sum(const BufferList& l) { + size_t bytes = 0; + for(BufferList::const_iterator i = l.begin(); i != l.end(); ++i) { + bytes += (*i)->size(); } - if(add) { - SocketManager::getInstance()->addWriter(this); - } + return bytes; } -bool ManagedSocket::fastWrite(const char* buf, size_t len, bool lowPrio /* = false */) throw() { - if((len == 0) || (disc > 0)) - return false; +size_t ManagedSocket::getQueuedBytes() const { + return sum(outBuf); +} + +void ManagedSocket::fastWrite(const BufferPtr& buf, bool lowPrio /* = false */) throw() { + if((buf->size() == 0) || (disc > 0)) + return; - bool add = false; - if(outBuf == 0) { - add = true; - outBuf = Util::freeBuf; - } + size_t queued = getQueuedBytes(); - if(outBuf->size() + len > (uint32_t)SETTING(MAX_BUFFER_SIZE)) { + if(queued + buf->size() > (size_t)SETTING(MAX_BUFFER_SIZE)) { if(lowPrio && SETTING(KEEP_SLOW_USERS)) { - return false; + return; } else if(overFlow > 0 && overFlow + SETTING(OVERFLOW_TIMEOUT) < GET_TICK()) { disconnect(Util::REASON_WRITE_OVERFLOW); - return false; + return; } else { overFlow = GET_TICK(); } } - Stats::queueBytes += len; + Stats::queueBytes += buf->size(); Stats::queueCalls++; - outBuf->insert(outBuf->end(), buf, buf + len); - return add; + outBuf.push_back(buf); } -ByteVector* ManagedSocket::prepareWrite() { +void ManagedSocket::prepareWrite(BufferList& buffers) { if(isBlocked()) { - return 0; + return; } - - ByteVector* buffer = 0; - { - FastMutex::Lock l(writeMutex); - - if(outBuf == 0) { - return 0; - } - - if(SETTING(MAX_SEND_SIZE) > 0 && (outBuf->size() > (size_t)SETTING(MAX_SEND_SIZE))) { - // Damn, we take a copy and leave the rest... - buffer = Util::freeBuf; - buffer->insert(buffer->end(), outBuf->begin(), outBuf->begin() + SETTING(MAX_SEND_SIZE)); - outBuf->erase(outBuf->begin(), outBuf->begin() + SETTING(MAX_SEND_SIZE)); - } else { - buffer = outBuf; - outBuf = 0; + FastMutex::Lock l(writeMutex); + size_t queued = getQueuedBytes(); + if(queued == 0) { + return; + } + + size_t max_send = static_cast<size_t>(SETTING(MAX_SEND_SIZE)); + + if((max_send > 0) && (queued > max_send)) { + // Copy as many buffers as possible + // TODO The last copied buffer should be split... + size_t done = 0; + BufferList::iterator i; + for(i = outBuf.begin(); i != outBuf.end(); ++i) { + buffers.push_back(*i); + done += (*i)->size(); + if(done > max_send) { + break; + } } + outBuf.erase(outBuf.begin(), i); + } else { + buffers.swap(outBuf); } - return buffer; } -bool ManagedSocket::completeWrite(ByteVector* buf, size_t written) throw() { +bool ManagedSocket::completeWrite(BufferList& buffers, size_t written) throw() { Stats::sendBytes += written; Stats::sendCalls++; - bool moreData; - { - FastMutex::Lock l(writeMutex); + size_t done = 0; + BufferList::iterator i = buffers.begin(); + for(; i != buffers.end(); ++i) { + if(done + (*i)->size() > written) { + break; + } + done += (*i)->size(); + } + + FastMutex::Lock l(writeMutex); + + if(done != written) { + // i points to the first not fully written buffer.. + size_t diff = written - done; + if(diff != 0) { + (*i)->erase_first(diff); + } - if(written != buf->size()) { - if(outBuf == 0) { - buf->erase(buf->begin(), buf->begin() + written); - outBuf = buf; - buf = 0; - } else { - outBuf->insert(outBuf->begin(), buf->begin() + written, buf->end()); - } - } - moreData = (outBuf != 0) || disc > 0; - if( !outBuf || (outBuf->size() < (size_t)SETTING(MAX_BUFFER_SIZE)) ) - overFlow = 0; - + outBuf.insert(outBuf.begin(), i, buffers.end()); } + + buffers.clear(); - if(buf) { - Util::freeBuf = buf; + size_t left = getQueuedBytes(); + if(overFlow > 0) { + if(left < static_cast<size_t>(SETTING(MAX_BUFFER_SIZE))) { + overFlow = 0; + } } - return moreData; + return left > 0 || disc > 0; } -bool ManagedSocket::completeRead(ByteVector* buf) throw() { +bool ManagedSocket::completeRead(const BufferPtr& buf) throw() { Stats::recvBytes += buf->size(); Stats::recvCalls++; SocketManager::getInstance()->addJob(std::tr1::bind(&ManagedSocket::processData, this, buf)); @@ -163,6 +177,7 @@ } void ManagedSocket::failSocket(int) throw() { + sock.disconnect(); SocketManager::getInstance()->addJob(failedHandler); } @@ -173,12 +188,10 @@ disc = GET_TICK() + SETTING(DISCONNECT_TIMEOUT); Util::reasons[reason]++; - SocketManager::getInstance()->addDisconnect(this); } -void ManagedSocket::processData(ByteVector* buf) throw() { - dataHandler(*buf); - Util::freeBuf = buf; +void ManagedSocket::processData(const BufferPtr& buf) throw() { + dataHandler(buf); } } Modified: adchpp/trunk/adchpp/ManagedSocket.h =================================================================== --- adchpp/trunk/adchpp/ManagedSocket.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/ManagedSocket.h 2007-12-30 21:41:46 UTC (rev 117) @@ -26,9 +26,10 @@ #include "Mutex.h" #include "Signal.h" #include "Util.h" +#include "Buffer.h" namespace adchpp { - + /** * An asynchronous socket managed by SocketManager. */ @@ -37,16 +38,16 @@ void create() throw(SocketException) { sock.create(); } /** Asynchronous write */ - ADCHPP_DLL void write(const char* buf, size_t len) throw(); + ADCHPP_DLL void write(const BufferPtr& buf) throw(); /** Asynchronous write, assumes that buffers are locked */ - ADCHPP_DLL bool fastWrite(const char* buf, size_t len, bool lowPrio = false) throw(); + ADCHPP_DLL void fastWrite(const BufferPtr& buf, bool lowPrio = false) throw(); /** Returns the lock used for the write buffers */ static FastMutex& getWriteMutex() { return writeMutex; } /** Returns the number of bytes in the output buffer; buffers must be locked */ - size_t getQueuedBytes() { return outBuf ? outBuf->size() : 0; } + size_t getQueuedBytes() const; /** Asynchronous disconnect. Pending data will be written, but no more data will be read. */ ADCHPP_DLL void disconnect(Util::Reason reason) throw(); @@ -56,7 +57,7 @@ typedef std::tr1::function<void()> ConnectedHandler; void setConnectedHandler(const ConnectedHandler& handler) { connectedHandler = handler; } - typedef std::tr1::function<void(const ByteVector&)> DataHandler; + typedef std::tr1::function<void(const BufferPtr&)> DataHandler; void setDataHandler(const DataHandler& handler) { dataHandler = handler; } typedef std::tr1::function<void()> FailedHandler; void setFailedHandler(const FailedHandler& handler) { failedHandler = handler; } @@ -69,17 +70,17 @@ ~ManagedSocket() throw(); // Functions for Writer (called from Writer thread) - ByteVector* prepareWrite(); + void prepareWrite(BufferList& buffers); void completeAccept() throw(); - bool completeWrite(ByteVector* buf, size_t written) throw(); - bool completeRead(ByteVector* buf) throw(); + bool completeWrite(BufferList& buffers, size_t written) throw(); + bool completeRead(const BufferPtr& buf) throw(); void failSocket(int error) throw(); void shutdown() { sock.shutdown(); } void close() { sock.disconnect(); } // Functions processing events - void processData(ByteVector* buf) throw(); + void processData(const BufferPtr& buf) throw(); // No copies ManagedSocket(const ManagedSocket&); @@ -88,8 +89,9 @@ friend class Writer; Socket sock; + /** Output buffer, for storing data that's waiting to be transmitted */ - ByteVector* outBuf; + BufferList outBuf; /** Overflow timer, the buffer is allowed to overflow for 1 minute, then disconnect */ uint32_t overFlow; /** Disconnection scheduled for this socket */ @@ -98,11 +100,11 @@ std::string ip; #ifdef _WIN32 /** Data currently being sent by WSASend, 0 if not sending */ - ByteVector* writeBuf; - /** WSABUF for data being sent */ - WSABUF wsabuf; + BufferList writeBuf; + /** Buffer containing WSABUF's for data being sent */ + BufferPtr wsabuf; - bool isBlocked() { return writeBuf != 0; } + bool isBlocked() { return !writeBuf.empty(); } #else bool blocked; bool isBlocked() { return blocked; } Modified: adchpp/trunk/adchpp/Socket.cpp =================================================================== --- adchpp/trunk/adchpp/Socket.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/Socket.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -163,15 +163,6 @@ return len; } -void Socket::write(const char* aBuffer, size_t aLen) throw(SocketException) { - size_t pos = writeNB(aBuffer, aLen); - while(pos < aLen) { - // Try once every second at least, you never know... - wait(1000, WAIT_WRITE); - pos += writeNB(aBuffer + pos, aLen - pos); - } -} - #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif @@ -187,13 +178,13 @@ * @return 0 if socket would block, otherwise the number of bytes written * @throw SocketExcpetion Send failed. */ -int Socket::writeNB(const char* aBuffer, size_t aLen) throw(SocketException) { +int Socket::write(const void* aBuffer, size_t aLen) throw(SocketException) { dcdebug("Writing %db: %.100s\n", aLen, aBuffer); dcassert(aLen > 0); - int i = ::send(sock, aBuffer, (int)aLen, MSG_NOSIGNAL | MSG_DONTWAIT); + int i = ::send(sock,(char*)aBuffer, (int)aLen, MSG_NOSIGNAL | MSG_DONTWAIT); if(i == SOCKET_ERROR) { - if(socket_errno == EWOULDBLOCK) { + if(socket_errno == EWOULDBLOCK || socket_errno == EINTR || socket_errno == EAGAIN) { return 0; } checksockerr(i); @@ -211,7 +202,7 @@ * @param aLen Data length * @throw SocketExcpetion Send failed. */ -void Socket::writeTo(const string& aIp, short aPort, const char* aBuffer, size_t aLen) throw(SocketException) { +void Socket::writeTo(const string& aIp, short aPort, const void* aBuffer, size_t aLen) throw(SocketException) { if(sock == INVALID_SOCKET) { create(TYPE_UDP); } @@ -238,7 +229,7 @@ serv_addr.sin_addr.s_addr = *((uint32_t*)host->h_addr); } - int i = ::sendto(sock, aBuffer, (int)aLen, 0, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); + int i = ::sendto(sock, (char*)aBuffer, (int)aLen, 0, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); checksockerr(i); } Modified: adchpp/trunk/adchpp/Socket.h =================================================================== --- adchpp/trunk/adchpp/Socket.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/Socket.h 2007-12-30 21:41:46 UTC (rev 117) @@ -107,12 +107,11 @@ virtual void bind(short aPort) throw(SocketException); virtual void connect(const std::string& aIp, short aPort) throw(SocketException); void connect(const std::string& aIp, const std::string& aPort) throw(SocketException) { connect(aIp, (short)Util::toInt(aPort)); } + int read(void* aBuffer, size_t aBufLen) throw(SocketException); virtual std::string accept(const Socket& aSocket) throw(SocketException); - virtual void write(const char* aBuffer, size_t aLen) throw(SocketException); + virtual int write(const void* aBuffer, size_t aLen) throw(SocketException); void write(const std::string& aData) throw(SocketException) { write(aData.data(), aData.length()); } - virtual int writeNB(const char* aBuffer, size_t aLen) throw(SocketException); - int writeNB(const std::string& aData) throw(SocketException) { return writeNB(aData.data(), aData.length()); } - virtual void writeTo(const std::string& aIp, short aPort, const char* aBuffer, size_t aLen) throw(SocketException); + virtual void writeTo(const std::string& aIp, short aPort, const void* aBuffer, size_t aLen) throw(SocketException); void writeTo(const std::string& aIp, short aPort, const std::string& aData) throw(SocketException) { writeTo(aIp, aPort, aData.data(), aData.length()); } virtual void disconnect() throw(); @@ -120,7 +119,6 @@ void shutdown() { ::shutdown(sock, SD_BOTH); } - int read(void* aBuffer, size_t aBufLen) throw(SocketException); int wait(uint32_t millis, int waitFor) throw(SocketException); static std::string resolve(const std::string& aDns); Modified: adchpp/trunk/adchpp/SocketManager.cpp =================================================================== --- adchpp/trunk/adchpp/SocketManager.cpp 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/SocketManager.cpp 2007-12-30 21:41:46 UTC (rev 117) @@ -321,13 +321,13 @@ DWORD x = 0; - ms->writeBuf = Util::freeBuf; - ms->writeBuf->resize(ACCEPT_BUF_SIZE); + ms->writeBuf.push_back(BufferPtr(new Buffer(ACCEPT_BUF_SIZE))); + ms->writeBuf.back()->resize(ACCEPT_BUF_SIZE); MSOverlapped* overlapped = pool.get(); *overlapped = MSOverlapped(MSOverlapped::ACCEPT_DONE, ms); - if(!::AcceptEx(srv.getSocket(), ms->getSocket(), &(*ms->writeBuf)[0], 0, ACCEPT_BUF_SIZE/2, ACCEPT_BUF_SIZE/2, &x, overlapped)) { + if(!::AcceptEx(srv.getSocket(), ms->getSocket(), ms->writeBuf.back()->data(), 0, ACCEPT_BUF_SIZE/2, ACCEPT_BUF_SIZE/2, &x, overlapped)) { int error = ::WSAGetLastError(); if(error != ERROR_IO_PENDING) { if(!stop) { @@ -348,12 +348,11 @@ struct sockaddr_in *local, *remote; int sz1 = sizeof(local), sz2 = sizeof(remote); - ::GetAcceptExSockaddrs(&(*ms->writeBuf)[0], 0, ACCEPT_BUF_SIZE/2, ACCEPT_BUF_SIZE/2, reinterpret_cast<sockaddr**>(&local), &sz1, reinterpret_cast<sockaddr**>(&remote), &sz2); + ::GetAcceptExSockaddrs(ms->writeBuf.back()->data(), 0, ACCEPT_BUF_SIZE/2, ACCEPT_BUF_SIZE/2, reinterpret_cast<sockaddr**>(&local), &sz1, reinterpret_cast<sockaddr**>(&remote), &sz2); ms->setIp(inet_ntoa(remote->sin_addr)); - Util::freeBuf = ms->writeBuf; - ms->writeBuf = 0; + ms->writeBuf.clear(); active.insert(ms); accepting.erase(ms); @@ -393,18 +392,14 @@ } void handleReadDone(const ManagedSocketPtr& ms) throw() { - ByteVector* readBuf = Util::freeBuf; + BufferPtr readBuf(new Buffer(SETTING(BUFFER_SIZE))); - if(readBuf->size() < (size_t)SETTING(BUFFER_SIZE)) - readBuf->resize(SETTING(BUFFER_SIZE)); + WSABUF wsa = { (u_long)readBuf->size(), (char*)readBuf->data() }; - WSABUF wsa = { (u_long)readBuf->size(), (char*)&(*readBuf)[0] }; - DWORD bytes = 0; DWORD flags = 0; if(::WSARecv(ms->getSocket(), &wsa, 1, &bytes, &flags, 0, 0) == SOCKET_ERROR) { - Util::freeBuf = readBuf; int error = ::WSAGetLastError(); if(error != WSAEWOULDBLOCK) { // Socket failed... @@ -417,7 +412,6 @@ } if(bytes == 0) { - Util::freeBuf = readBuf; disconnect(ms, 0); return; } @@ -429,13 +423,13 @@ } void write(const ManagedSocketPtr& ms) throw() { - if(stop || !(*ms)) { + if(stop || !(*ms) || !ms->writeBuf.empty()) { return; } - ms->writeBuf = ms->prepareWrite(); + ms->prepareWrite(ms->writeBuf); - if(!ms->writeBuf) { + if(ms->writeBuf.empty()) { uint32_t now = GET_TICK(); if(ms->disc || (ms->isBlocked() && ms->disc < now)) { @@ -444,14 +438,17 @@ return; } - ms->wsabuf.len = ms->writeBuf->size(); - ms->wsabuf.buf = reinterpret_cast<char*>(&(*ms->writeBuf)[0]); - + ms->wsabuf->resize(sizeof(WSABUF) * ms->writeBuf.size()); + for(size_t i = 0; i < ms->writeBuf.size(); ++i) { + WSABUF wsa = { (u_long)ms->writeBuf[i]->size(), (char*)ms->writeBuf[i]->data() }; + memcpy(ms->wsabuf->data() + i * sizeof(WSABUF), &wsa, sizeof(WSABUF)); + } + MSOverlapped* overlapped = pool.get(); *overlapped = MSOverlapped(MSOverlapped::WRITE_DONE, ms); DWORD x = 0; - if(::WSASend(ms->getSocket(), &ms->wsabuf, 1, &x, 0, reinterpret_cast<LPWSAOVERLAPPED>(overlapped), 0) != 0) { + if(::WSASend(ms->getSocket(), (WSABUF*)ms->wsabuf->data(), ms->writeBuf.size(), &x, 0, reinterpret_cast<LPWSAOVERLAPPED>(overlapped), 0) != 0) { int error = ::WSAGetLastError(); if(error != WSA_IO_PENDING) { pool.put(overlapped); @@ -461,19 +458,10 @@ } void handleWriteDone(const ManagedSocketPtr& ms, DWORD bytes) throw() { - ByteVector* buf = ms->writeBuf; - ms->writeBuf = 0; - - if(!buf) { - dcdebug("No buffer in handleWriteDone??\n"); - return; - } - ms->completeWrite(buf, bytes); + ms->completeWrite(ms->writeBuf, bytes); } void failWrite(const ManagedSocketPtr& ms, int error) throw() { - Util::freeBuf = ms->writeBuf; - ms->writeBuf = 0; disconnect(ms, error); } @@ -516,6 +504,7 @@ ManagedSocketPtr ms(new ManagedSocket()); try { ms->setIp(ms->sock.accept(srv)); + ms->sock.setBlocking(false); if(!poller.associate(ms)) { LOG(SocketManager::className, "Unable to associate EPoll: " + Util::translateError(errno)); @@ -540,14 +529,10 @@ return false; for(;;) { - ByteVector* readBuf = Util::freeBuf; - if(readBuf->size() < (size_t)SETTING(BUFFER_SIZE)) - readBuf->resize(SETTING(BUFFER_SIZE)); - - ssize_t bytes = ::recv(ms->getSocket(), &(*readBuf)[0], readBuf->size(), MSG_DONTWAIT); + BufferPtr buf(new Buffer(SETTING(BUFFER_SIZE))); + + ssize_t bytes = ::recv(ms->getSocket(), buf->data(), buf->size(), MSG_DONTWAIT); if(bytes == -1) { - Util::freeBuf = readBuf; - int error = errno; if(error != EAGAIN && error != EINTR) { ms->close(); @@ -556,14 +541,13 @@ } break; } else if(bytes == 0) { - Util::freeBuf = readBuf; ms->close(); disconnect(ms, 0); return false; } - readBuf->resize(bytes); - ms->completeRead(readBuf); + buf->resize(bytes); + ms->completeRead(buf); } return true; } @@ -572,30 +556,33 @@ if(stop || !(*ms)) { return; } - + BufferList buffers; while(true) { - ByteVector* writeBuf = ms->prepareWrite(); - - if(!writeBuf) { + ms->prepareWrite(buffers); + if(buffers.empty()) { uint32_t now = GET_TICK(); if(ms->disc || (ms->isBlocked() && ms->disc < now)) { disconnect(ms, 0); } return; } - - ssize_t bytes = ::send(ms->getSocket(), &(*writeBuf)[0], writeBuf->size(), MSG_NOSIGNAL | MSG_DONTWAIT); + std::vector<iovec> iov(buffers.size()); + for(size_t i = 0; i < buffers.size(); ++i) { + iov[i].iov_base = buffers[i]->data(); + iov[i].iov_len = buffers[i]->size(); + } + ssize_t bytes = ::writev(ms->getSocket(), &iov[0], iov.size()); if(bytes == -1) { int error = errno; if(error == EAGAIN) { - ms->completeWrite(writeBuf, 0); + ms->completeWrite(buffers, 0); return; } - Util::freeBuf = writeBuf; + //Util::freeBuf = writeBuf; disconnect(ms, error); return; } - if(!ms->completeWrite(writeBuf, bytes)) { + if(!ms->completeWrite(buffers, bytes)) { break; } } @@ -717,15 +704,6 @@ return 0; } -void SocketManager::addWriter(const ManagedSocketPtr& ms) throw() { -} - -void SocketManager::addAllWriters() throw() { -} - -void SocketManager::addDisconnect(const ManagedSocketPtr& ms) throw() { -} - void SocketManager::addJob(const Callback& callback) throw() { FastMutex::Lock l(processMutex); Modified: adchpp/trunk/adchpp/SocketManager.h =================================================================== --- adchpp/trunk/adchpp/SocketManager.h 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/adchpp/SocketManager.h 2007-12-30 21:41:46 UTC (rev 117) @@ -37,10 +37,6 @@ void startup() throw(ThreadException) { start(); } void shutdown(); - void addWriter(const ManagedSocketPtr& ms) throw(); - void addDisconnect(const ManagedSocketPtr& ms) throw(); - void addAllWriters() throw(); - typedef std::tr1::function<void (const ManagedSocketPtr&)> IncomingHandler; void setIncomingHandler(const IncomingHandler& handler) { incomingHandler = handler; } Modified: adchpp/trunk/swig/adchpp.i =================================================================== --- adchpp/trunk/swig/adchpp.i 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/swig/adchpp.i 2007-12-30 21:41:46 UTC (rev 117) @@ -327,8 +327,8 @@ StringList& getParameters(); //const StringList& getParameters() const; - const std::string& toString() const; - void resetString(); + std::string toString() const; + void resetBuffer(); AdcCommand& addParam(const std::string& name, const std::string& value); AdcCommand& addParam(const std::string& str); Modified: adchpp/trunk/test/PyClient.py =================================================================== --- adchpp/trunk/test/PyClient.py 2007-12-28 15:35:31 UTC (rev 116) +++ adchpp/trunk/test/PyClient.py 2007-12-30 21:41:46 UTC (rev 117) @@ -1,159 +1,157 @@ -#!/usr/bin/python -import sys - -sys.path.append('../build/debug-default/bin') - -CLIENTS = 100 - -import socket, threading, time - -from pyadchpp import ParseException, Util_initialize, CID, CID_generate, Encoder_toBase32, Encoder_fromBase32, AdcCommand, AdcCommand_toSID, TigerHash, CID - -Util_initialize("") - -class Client(object): - def __init__(self, n): - self.sock = socket.socket() - self.pid = CID_generate() - tiger = TigerHash() - tiger.update(self.pid.data()) - self.cid = CID(Encoder_toBase32(tiger.finalize())) - self.nick = "user_" + str(n) + "_" + self.cid.toBase32() - self.running = True - self.line = "" - - def connect(self, ipport): - self.sock.connect(ipport) - - def command(self, cmd): - s = cmd.toString() - print self.nick, "sending", s - self.sock.send(cmd.toString()) - - def get_command(self): - index = self.line.find('\n') - while index == -1: - line = self.sock.recv(4096) - if len(line) == 0: - return None - - self.line += line - index = self.line.find('\n') - if index==0: - self.line = self.line[index+1:] - index = -1 - - self.lastline = self.line[:index + 1] - self.line = self.line[index+1:] - return AdcCommand(self.lastline) - - def expect(self, command): - cmd = self.get_command() - if not cmd or cmd.getCommand() != command: - if not cmd: - error = "expect: connection closed" - else: - error = "expect: " + cmd.getCommandString() - raise Exception, error - return cmd - - def login(self, ipport): - self.connect(ipport) - cmd = AdcCommand(AdcCommand.CMD_SUP, AdcCommand.TYPE_HUB, 0) - cmd.addParam("ADBASE").addParam("ADTIGR") - self.command(cmd) - self.expect(AdcCommand.CMD_SUP) - sid = self.expect(AdcCommand.CMD_SID) - self.sid = AdcCommand_toSID(sid.getParam(0)) - - cmd = AdcCommand(AdcCommand.CMD_INF, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("ID" + self.cid.toBase32()) - cmd.addParam("PD" + self.pid.toBase32()) - cmd.addParam("NI" + self.nick) - self.command(cmd) - -# def test_close(self): -# self.sock.close() - - def test_error(self): - cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("+error") - self.command(cmd) - - def test_test(self): - cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("+test") - self.command(cmd) - - def test_msg(self): - cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("hello from " + self.nick) - self.command(cmd) - - def test_nick(self): - self.nick = "user_" + str(CID_generate()) - cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("renaming myself to " + self.nick) - self.command(cmd) - cmd = AdcCommand(AdcCommand.CMD_INF, AdcCommand.TYPE_BROADCAST, self.sid) - cmd.addParam("NI", self.nick) - self.command(cmd) - - def __call__(self): - try: - while self.get_command(): - pass - self.sock.close() - except Exception, e: - print "Client " + self.nick + " died:", e - except ParseException, e: - print "Client " + self.nick + " died, line was:", self.lastline - self.running = False -try: - import sys - if len(sys.argv) > 2: - ip = sys.argv[1] - port = int(sys.argv[2]) - else: - ip = "127.0.0.1" - port = 2780 - - clients = [] - for i in range(CLIENTS): - if i > 0 and i % 10 == 0: - #time.sleep(3) - pass - print "Logging in", i - client = Client(i) - clients.append(client) - client.login((ip,port)) - t = threading.Thread(target = client, name = client.nick) - t.setDaemon(True) - t.start() - - time.sleep(5) - import random - tests = [] - for k,v in Client.__dict__.iteritems(): - if len(k) < 4 or k[0:4] != "test": - continue - tests.append(v) - print tests - while len(clients) > 0: - time.sleep(1) - for c in clients: - if not c.running: - clients.remove(c) - - if len(clients) == 0: - break - - if random.random() > (5./len(clients)): - continue - try: - random.choice(tests)(c) - except Exception, e: - pass - print "No more clients" -except Exception, e: - print e +#!/usr/bin/python +import sys + +sys.path.append('../build/debug-default/bin') + +CLIENTS = 100 + +import socket, threading, time, random, sys + +from pyadchpp import ParseException, Util_initialize, CID, CID_generate, Encoder_toBase32, Encoder_fromBase32, AdcCommand, AdcCommand_toSID, TigerHash, CID + +Util_initialize("") + +class Client(object): + def __init__(self, n): + self.sock = socket.socket() + self.pid = CID_generate() + tiger = TigerHash() + tiger.update(self.pid.data()) + self.cid = CID(Encoder_toBase32(tiger.finalize())) + self.nick = "user_" + str(n) + "_" + self.cid.toBase32() + self.running = True + self.line = "" + + def connect(self, ipport): + self.sock.connect(ipport) + + def command(self, cmd): + s = cmd.toString() + print self.nick, "sending", len(s), s + self.sock.send(s) + + def get_command(self): + index = self.line.find('\n') + while index == -1: + line = self.sock.recv(4096) + if len(line) == 0: + return None + + self.line += line + index = self.line.find('\n') + if index==0: + self.line = self.line[index+1:] + index = -1 + + self.lastline = self.line[:index + 1] + self.line = self.line[index+1:] + return AdcCommand(self.lastline) + + def expect(self, command): + cmd = self.get_command() + if not cmd or cmd.getCommand() != command: + if not cmd: + error = "expect: connection closed" + else: + error = "expect: " + cmd.getCommandString() + raise Exception, error + return cmd + + def login(self, ipport): + self.connect(ipport) + cmd = AdcCommand(AdcCommand.CMD_SUP, AdcCommand.TYPE_HUB, 0) + cmd.addParam("ADBASE").addParam("ADTIGR") + self.command(cmd) + self.expect(AdcCommand.CMD_SUP) + sid = self.expect(AdcCommand.CMD_SID) + self.sid = AdcCommand_toSID(sid.getParam(0)) + + cmd = AdcCommand(AdcCommand.CMD_INF, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("ID" + self.cid.toBase32()) + cmd.addParam("PD" + self.pid.toBase32()) + cmd.addParam("NI" + self.nick) + self.command(cmd) + +# def test_close(self): +# self.sock.close() + + def test_error(self): + cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("+error") + self.command(cmd) + + def test_test(self): + cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("+test") + self.command(cmd) + + def test_msg(self): + cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("hello from " + self.nick) + self.command(cmd) + + def test_nick(self): + self.nick = "user_" + str(CID_generate()) + cmd = AdcCommand(AdcCommand.CMD_MSG, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("renaming myself to " + self.nick) + self.command(cmd) + cmd = AdcCommand(AdcCommand.CMD_INF, AdcCommand.TYPE_BROADCAST, self.sid) + cmd.addParam("NI", self.nick) + self.command(cmd) + + def __call__(self): + try: + while self.get_command(): + pass + self.sock.close() + except Exception, e: + print "Client " + self.nick + " died:", e + except ParseException, e: + print "Client " + self.nick + " died, line was:", self.lastline + self.running = False +try: + if len(sys.argv) > 2: + ip = sys.argv[1] + port = int(sys.argv[2]) + else: + ip = "127.0.0.1" + port = 2780 + + clients = [] + for i in range(CLIENTS): + if i > 0 and i % 10 == 0: + #time.sleep(3) + pass + print "Logging in", i + client = Client(i) + clients.append(client) + client.login((ip,port)) + t = threading.Thread(target = client, name = client.nick) + t.setDaemon(True) + t.start() + + time.sleep(5) + tests = [] + for k,v in Client.__dict__.iteritems(): + if len(k) < 4 or k[0:4] != "test": + continue + tests.append(v) + print tests + while len(clients) > 0: + #time.sleep(1) + for c in clients: + if not c.running: + clients.remove(c) + + if len(clients) == 0: + break + + if random.random() > (5./len(clients)): + continue + try: + random.choice(tests)(c) + except Exception, e: + pass + print "No more clients" +except Exception, e: + print e This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |