From: Tor M. <tmy...@cs...> - 2009-07-29 06:56:17
|
I have a central scribe server and a bunch of peripheral scribe servers. The peripheral servers are configured with the central server as a primary (network) store and a file store as a secondary store. This makes the logging system tolerate link failures and a failure of the central server. The central server is configured with only file stores. (Specifically, I made the decision to have a primary file store and a secondary file store, but it happened that both were on the same partition after moving the service to a different machine.) So, when the central server runs out of disk space, it (a) can't store messages to disk, so it silently discards them and (b) continues replying to Log calls with OK rather than TRY_AGAIN. (a) alone isn't cool; it should leave the messages in the queue rather than throwing them away. When combined with (b), this all means that when the central server runs out of disk space, all log messages vanish into the ether rather than being buffered at the peripheral servers as they should (until someone fixes things or they too run out of disk space). So I want to hack scribe so that when it can't store messages to disk for whatever reason, it leaves them in the queue instead of throwing them away and it also rejects incoming messages until it's able to save messages to the store again. I wrote the following (against a version of scribe from March): Index: src/scribe_server.cpp =================================================================== --- src/scribe_server.cpp (revision 45848) +++ src/scribe_server.cpp (revision 45886) @@ -269,11 +269,17 @@ return true; } +extern volatile int boned, peripheral; + ResultCode scribeHandler::Log(const vector<LogEntry>& messages) { //LOG_OPER("received Log with <%d> messages", (int)messages.size()); - if (throttleDeny(messages.size())) { + if (boned) { + return TRY_LATER; + } + + if (!peripheral && throttleDeny(messages.size())) { incrementCounter("denied for rate"); return TRY_LATER; } Index: src/store.cpp =================================================================== --- src/store.cpp (revision 45848) +++ src/store.cpp (revision 45886) @@ -539,7 +539,7 @@ bool FileStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) { - if (!isOpen()) { + if (!isOpen() && !openInternal(true, NULL)) { LOG_OPER("[%s] File failed to open FileStore::handleMessages()", categoryHandled.c_str()); return false; } @@ -1247,6 +1247,8 @@ } +volatile int peripheral; + NetworkStore::NetworkStore(const string& category, bool multi_category) : Store(category, "network", multi_category), useConnPool(false), @@ -1254,6 +1256,7 @@ remotePort(0), opened(false) { // we can't open the connection until we get configured + peripheral = true; // the bool for opened ensures that we don't make duplicate // close calls, which would screw up the connection pool's Index: src/store_queue.cpp =================================================================== --- src/store_queue.cpp (revision 45848) +++ src/store_queue.cpp (revision 45886) @@ -194,6 +194,8 @@ return store->getType(); } +volatile int boned; + void StoreQueue::threadMember() { LOG_OPER("store thread starting"); @@ -260,10 +262,10 @@ pthread_mutex_unlock(&cmdMutex); // handle messages if stopping, enough time has passed, or queue is large - // + // and we're not boned. if (stop || (this_loop - last_handle_messages > maxWriteInterval) || - msgQueueSize >= targetWriteSize) { + msgQueueSize >= targetWriteSize && !boned) { if (msgQueueSize > 0) { boost::shared_ptr<logentry_vector_t> messages = msgQueue; @@ -273,12 +275,23 @@ pthread_mutex_unlock(&msgMutex); if (!store->handleMessages(messages)) { - // Store could not handle these messages, nothing else to do + /*// Store could not handle these messages, nothing else to do // other than to record them as being lost LOG_OPER("[%s] WARNING: Lost %lu messages!", categoryHandled.c_str(), messages->size()); - g_Handler->incrementCounter("lost", messages->size()); + g_Handler->incrementCounter("lost", messages->size());*/ + // lies; we can requeue them and set boned. + boned=1; + pthread_mutex_lock(&msgMutex); + for (size_t i = 0; i < msgQueue->size(); i++) + messages->push_back((*msgQueue)[i]); + msgQueue = messages; + msgQueueSize = 0; + for (size_t i = 0; i < msgQueue->size(); i++) + msgQueueSize += (*msgQueue)[i]->message.size(); + pthread_mutex_unlock(&msgMutex); } + else boned = 0; // write succeeded; no longer boned. store->flush(); } else { pthread_mutex_unlock(&msgMutex); This looks to me like it works. I had to add the hack to FileStore because there seems to be no code path that reopens the file in as long as I could bo bothered to wait. I tested it by having a simple two-server configuration (one peripheral, one central). The central server is told to write to a 256MB ramdisk that contains a 60MB and a 192MB file, then I delete the 60MB file once scribe starts failing to write, then I delete the 192MB file once scribe fails to write again. I then give the peripheral server a bunch of messages that are the positive integers in increasing order followed by newlines, one number per message. No messages were lost, but a few were duplicated in one chunk and the next. I know this isn't the right way to handle the problem --- we should make sure there's space for writing a message to disk before writing the actually message --- but am I going to run into strange problems if I use this patch? (I see in trunk scribe that there's some logic for handling failed messages. I haven't tested it, though. And at a glance, I also don't understand why its resource consumption is bounded; I see no logic to tell clients to try later if we're making no progress on the queue. Or why mustSucceed should be false by default.) |