From: <arn...@us...> - 2007-12-23 20:30:51
|
Revision: 111 http://adchpp.svn.sourceforge.net/adchpp/?rev=111&view=rev Author: arnetheduck Date: 2007-12-23 12:30:27 -0800 (Sun, 23 Dec 2007) Log Message: ----------- Bloom filter fixes Modified Paths: -------------- adchpp/trunk/adchpp/ClientManager.cpp adchpp/trunk/adchpp/ClientManager.h adchpp/trunk/plugins/Bloom/src/BloomManager.cpp adchpp/trunk/plugins/Bloom/src/BloomManager.h adchpp/trunk/plugins/Bloom/src/HashBloom.cpp adchpp/trunk/plugins/Bloom/src/HashBloom.h adchpp/trunk/unix/po/adchppd.pot Modified: adchpp/trunk/adchpp/ClientManager.cpp =================================================================== --- adchpp/trunk/adchpp/ClientManager.cpp 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/adchpp/ClientManager.cpp 2007-12-23 20:30:27 UTC (rev 111) @@ -50,19 +50,23 @@ void ClientManager::send(const AdcCommand& cmd, bool lowPrio /* = false */) throw() { const string& txt = cmd.toString(); - + + bool all = false; switch (cmd.getType()) { - case AdcCommand::TYPE_FEATURE: - case AdcCommand::TYPE_BROADCAST: { - bool all = (cmd.getType() == AdcCommand::TYPE_BROADCAST); + case AdcCommand::TYPE_BROADCAST: all = true; // Fallthrough + case AdcCommand::TYPE_FEATURE: { FastMutex::Lock l(ManagedSocket::getWriteMutex()); for (ClientIter i = clients.begin(); i != clients.end(); ++i) { - if (all || !i->second->isFiltered(cmd.getFeatures())) - i->second->fastSend(txt, lowPrio); + if (all || !i->second->isFiltered(cmd.getFeatures())) { + int override = 0; + signalSend_(*i->second, cmd, override); + if(!(override & DONT_SEND)) { + i->second->fastSend(txt, lowPrio); + } + } } - } SocketManager::getInstance()->addAllWriters(); - break; + } break; case AdcCommand::TYPE_DIRECT: // Fallthrough case AdcCommand::TYPE_ECHO: { ClientIter i = clients.find(cmd.getTo()); @@ -75,9 +79,8 @@ } } } + } break; } - break; - } } void ClientManager::sendToAll(const string& cmd) throw() { Modified: adchpp/trunk/adchpp/ClientManager.h =================================================================== --- adchpp/trunk/adchpp/ClientManager.h 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/adchpp/ClientManager.h 2007-12-23 20:30:27 UTC (rev 111) @@ -152,7 +152,7 @@ typedef SignalTraits<void (Client&)> SignalConnected; typedef SignalTraits<void (Client&, AdcCommand&, int&)> SignalReceive; typedef SignalTraits<void (Client&, const std::string&)> SignalBadLine; - typedef SignalTraits<void (Client&, AdcCommand&, int&)> SignalSend; + typedef SignalTraits<void (Client&, const AdcCommand&, int&)> SignalSend; typedef SignalTraits<void (Client&, int)> SignalState; typedef SignalTraits<void (Client&)> SignalDisconnected; Modified: adchpp/trunk/plugins/Bloom/src/BloomManager.cpp =================================================================== --- adchpp/trunk/plugins/Bloom/src/BloomManager.cpp 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/plugins/Bloom/src/BloomManager.cpp 2007-12-23 20:30:27 UTC (rev 111) @@ -25,16 +25,18 @@ #include <adchpp/Util.h> using namespace std; +using namespace std::tr1; using namespace std::tr1::placeholders; using namespace adchpp; BloomManager* BloomManager::instance = 0; const string BloomManager::className = "BloomManager"; -BloomManager::BloomManager() { +BloomManager::BloomManager() : searches(0), tthSearches(0), stopped(0) { LOG(className, "Starting"); ClientManager* cm = ClientManager::getInstance(); receiveConn = manage(&cm->signalReceive(), std::tr1::bind(&BloomManager::onReceive, this, _1, _2, _3)); + sendConn = manage(&cm->signalSend(), std::tr1::bind(&BloomManager::onSend, this, _1, _2, _3)); disconnectConn = manage(&cm->signalDisconnected(), std::tr1::bind(&BloomManager::onDisconnected, this, _1)); } @@ -56,11 +58,10 @@ size_t k = HashBloom::get_k(n); size_t m = HashBloom::get_m(n, k); + blooms.erase(c.getSID()); + + pending[c.getSID()] = make_tuple(ByteVector(), m, k); - HashBloom& bloom = blooms[c.getCID()]; - - bloom.reset(k); - AdcCommand get(AdcCommand::CMD_GET); get.addParam("blom"); get.addParam("/"); @@ -77,29 +78,83 @@ return; } + PendingMap::const_iterator i = pending.find(c.getSID()); + if(i == pending.end()) { + c.send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_BAD_STATE, "Unexpected bloom filter update")); + c.disconnect(Util::REASON_BAD_STATE); + override |= ClientManager::DONT_DISPATCH | ClientManager::DONT_SEND; + return; + } + int64_t bytes = Util::toInt(cmd.getParam(3)); - c.setDataMode(std::tr1::bind(&BloomManager::onData, this, _1, _2, _3), bytes); - override |= ClientManager::DONT_DISPATCH | ClientManager::DONT_SEND; - } else if(cmd.getCommand() == AdcCommand::CMD_SCH && cmd.getParam("TR", 0, tmp)) { - BloomMap::const_iterator i = blooms.find(c.getCID()); - if(i != blooms.end() && !i->second.match(TTHValue(tmp))) { - // Stop it - dcdebug("Stopping search\n"); + if(bytes != static_cast<int64_t>(get<1>(i->second) / 8)) { + c.send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Invalid number of bytes")); + c.disconnect(Util::REASON_PLUGIN); override |= ClientManager::DONT_DISPATCH | ClientManager::DONT_SEND; + pending.erase(c.getSID()); + return; } + + c.setDataMode(bind(&BloomManager::onData, this, _1, _2, _3), bytes); + override |= ClientManager::DONT_DISPATCH | ClientManager::DONT_SEND; + } else if(cmd.getCommand() == AdcCommand::CMD_MSG && cmd.getParameters().size() >= 1) { + if(cmd.getParam(0).compare(0, 6, "+stats") == 0) { + string stats = "\nBloom filter statistics:"; + stats += "\nTotal outgoing searches: " + Util::toString(searches); + stats += "\nOutgoing TTH searches: " + Util::toString(tthSearches) + " (" + Util::toString(tthSearches * 100. / searches) + "% of total)"; + stats += "\nStopped outgoing searches: " + Util::toString(stopped) + " (" + Util::toString(stopped * 100. / searches) + "% of total, " + Util::toString(stopped * 100. / tthSearches) + "% of TTH searches"; + int64_t bytes = getBytes(); + size_t clients = ClientManager::getInstance()->getClients().size(); + stats += "\nClient support: " + Util::toString(blooms.size()) + "/" + Util::toString(clients) + " (" + Util::toString(blooms.size() * 100. / clients) + "%)"; + stats += "\nApproximate memory usage: " + Util::formatBytes(bytes) + ", " + Util::formatBytes(static_cast<double>(bytes) / clients) + "/client"; + c.send(AdcCommand(AdcCommand::CMD_MSG).addParam(stats)); + override |= ClientManager::DONT_SEND; + } } } -void BloomManager::onData(Client& c, const uint8_t* data, size_t len) { - HashBloom& bloom = blooms[c.getCID()]; - for(size_t i = 0; i < len; ++i) { - for(size_t j = 0; j < 8; ++j) { - bloom.push_back(data[i] & (1 << j)); +void BloomManager::onSend(Client& c, const AdcCommand& cmd, int& override) { + if(cmd.getCommand() == AdcCommand::CMD_SCH) { + searches++; + string tmp; + if(cmd.getParam("TR", 0, tmp)) { + tthSearches++; + BloomMap::const_iterator i = blooms.find(c.getSID()); + if(i != blooms.end() && !i->second.match(TTHValue(tmp))) { + // Stop it + stopped++; + dcdebug("Stopping search\n"); + override |= ClientManager::DONT_SEND; + } } + } +} +int64_t BloomManager::getBytes() const { + int64_t bytes = 0; + for(BloomMap::const_iterator i = blooms.begin(); i != blooms.end(); ++i) { + bytes += i->second.size() / 8; } + return bytes; } +void BloomManager::onData(Client& c, const uint8_t* data, size_t len) { + PendingMap::iterator i = pending.find(c.getSID()); + if(i == pending.end()) { + // Shouldn't happen + return; + } + ByteVector& v = get<0>(i->second); + v.insert(v.end(), data, data + len); + + if(v.size() == get<1>(i->second) / 8) { + HashBloom& bloom = blooms[c.getSID()]; + bloom.reset(v, get<2>(i->second)); + pending.erase(i); + } +} + void BloomManager::onDisconnected(Client& c) { - blooms.erase(c.getCID()); + blooms.erase(c.getSID()); + pending.erase(c.getSID()); } Modified: adchpp/trunk/plugins/Bloom/src/BloomManager.h =================================================================== --- adchpp/trunk/plugins/Bloom/src/BloomManager.h 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/plugins/Bloom/src/BloomManager.h 2007-12-23 20:30:27 UTC (rev 111) @@ -19,10 +19,7 @@ #ifndef BLOOM_MANAGER_H #define BLOOM_MANAGER_H -#if _MSC_VER > 1000 -#pragma once -#endif // _MSC_VER > 1000 - +#include <tuple> #include <adchpp/Exception.h> #include <adchpp/Singleton.h> #include <adchpp/ClientManager.h> @@ -55,21 +52,33 @@ virtual int getVersion() { return 0; } - void onReceive(Client& c, AdcCommand& cmd, int&); - void onData(Client& c, const uint8_t* data, size_t len); - void onDisconnected(Client& c); - static const std::string className; private: friend class Singleton<BloomManager>; static BloomManager* instance; - typedef std::tr1::unordered_map<CID, HashBloom> BloomMap; + typedef std::tr1::unordered_map<uint32_t, HashBloom> BloomMap; BloomMap blooms; + // bytes, m, k + typedef std::tr1::tuple<ByteVector, size_t, size_t> PendingItem; + typedef std::tr1::unordered_map<uint32_t, PendingItem> PendingMap; + PendingMap pending; + + int64_t searches; + int64_t tthSearches; + int64_t stopped; + ClientManager::SignalReceive::ManagedConnection receiveConn; ClientManager::SignalDisconnected::ManagedConnection disconnectConn; + ClientManager::SignalSend::ManagedConnection sendConn; + int64_t getBytes() const; + void onReceive(Client& c, AdcCommand& cmd, int&); + void onSend(Client& c, const AdcCommand& cmd, int&); + void onData(Client& c, const uint8_t* data, size_t len); + void onDisconnected(Client& c); + }; #endif //ACCESSMANAGER_H Modified: adchpp/trunk/plugins/Bloom/src/HashBloom.cpp =================================================================== --- adchpp/trunk/plugins/Bloom/src/HashBloom.cpp 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/plugins/Bloom/src/HashBloom.cpp 2007-12-23 20:30:27 UTC (rev 111) @@ -41,9 +41,15 @@ bloom.push_back(v); } -void HashBloom::reset(size_t k_) { - bloom.resize(0); +void HashBloom::reset(ByteVector& v, size_t k_) { k = k_; + + bloom.resize(v.size() * 8); + for(size_t i = 0; i < v.size(); ++i) { + for(size_t j = 0; j < 8; ++j) { + bloom[i*8 + j] = ((v[i] >> j) != 0); + } + } } size_t HashBloom::pos(const TTHValue& tth, size_t n) const { Modified: adchpp/trunk/plugins/Bloom/src/HashBloom.h =================================================================== --- adchpp/trunk/plugins/Bloom/src/HashBloom.h 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/plugins/Bloom/src/HashBloom.h 2007-12-23 20:30:27 UTC (rev 111) @@ -22,8 +22,10 @@ void add(const TTHValue& tth); bool match(const TTHValue& tth) const; - void reset(size_t k); + void reset(ByteVector& v, size_t k); void push_back(bool v); + + size_t size() const { return bloom.size(); } private: size_t pos(const TTHValue& tth, size_t n) const; Modified: adchpp/trunk/unix/po/adchppd.pot =================================================================== --- adchpp/trunk/unix/po/adchppd.pot 2007-12-22 23:08:03 UTC (rev 110) +++ adchpp/trunk/unix/po/adchppd.pot 2007-12-23 20:30:27 UTC (rev 111) @@ -7,7 +7,7 @@ msgstr "" "Project-Id-Version: \"adchpp\"--copyright-holder=\"Jacek Sieka\"\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2007-12-22 23:02+0100\n" +"POT-Creation-Date: 2007-12-23 00:10+0100\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" "Language-Team: LANGUAGE <LL...@li...>\n" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |