|
From: Tor M. <tmy...@cs...> - 2009-07-29 06:56:17
|
I have a central scribe server and a bunch of peripheral scribe servers.
The peripheral servers are configured with the central server as a primary
(network) store and a file store as a secondary store. This makes the
logging system tolerate link failures and a failure of the central server.
The central server is configured with only file stores. (Specifically, I
made the decision to have a primary file store and a secondary file store,
but it happened that both were on the same partition after moving the
service to a different machine.) So, when the central server runs out of
disk space, it
(a) can't store messages to disk, so it silently discards them
and
(b) continues replying to Log calls with OK rather than TRY_AGAIN.
(a) alone isn't cool; it should leave the messages in the queue rather
than throwing them away. When combined with (b), this all means that when
the central server runs out of disk space, all log messages vanish into
the ether rather than being buffered at the peripheral servers as they
should (until someone fixes things or they too run out of disk space).
So I want to hack scribe so that when it can't store messages to disk for
whatever reason, it leaves them in the queue instead of throwing them away
and it also rejects incoming messages until it's able to save messages to
the store again.
I wrote the following (against a version of scribe from March):
Index: src/scribe_server.cpp
===================================================================
--- src/scribe_server.cpp (revision 45848)
+++ src/scribe_server.cpp (revision 45886)
@@ -269,11 +269,17 @@
return true;
}
+extern volatile int boned, peripheral;
+
ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
//LOG_OPER("received Log with <%d> messages", (int)messages.size());
- if (throttleDeny(messages.size())) {
+ if (boned) {
+ return TRY_LATER;
+ }
+
+ if (!peripheral && throttleDeny(messages.size())) {
incrementCounter("denied for rate");
return TRY_LATER;
}
Index: src/store.cpp
===================================================================
--- src/store.cpp (revision 45848)
+++ src/store.cpp (revision 45886)
@@ -539,7 +539,7 @@
bool FileStore::handleMessages(boost::shared_ptr<logentry_vector_t> messages) {
- if (!isOpen()) {
+ if (!isOpen() && !openInternal(true, NULL)) {
LOG_OPER("[%s] File failed to open FileStore::handleMessages()", categoryHandled.c_str());
return false;
}
@@ -1247,6 +1247,8 @@
}
+volatile int peripheral;
+
NetworkStore::NetworkStore(const string& category, bool multi_category)
: Store(category, "network", multi_category),
useConnPool(false),
@@ -1254,6 +1256,7 @@
remotePort(0),
opened(false) {
// we can't open the connection until we get configured
+ peripheral = true;
// the bool for opened ensures that we don't make duplicate
// close calls, which would screw up the connection pool's
Index: src/store_queue.cpp
===================================================================
--- src/store_queue.cpp (revision 45848)
+++ src/store_queue.cpp (revision 45886)
@@ -194,6 +194,8 @@
return store->getType();
}
+volatile int boned;
+
void StoreQueue::threadMember() {
LOG_OPER("store thread starting");
@@ -260,10 +262,10 @@
pthread_mutex_unlock(&cmdMutex);
// handle messages if stopping, enough time has passed, or queue is large
- //
+ // and we're not boned.
if (stop ||
(this_loop - last_handle_messages > maxWriteInterval) ||
- msgQueueSize >= targetWriteSize) {
+ msgQueueSize >= targetWriteSize && !boned) {
if (msgQueueSize > 0) {
boost::shared_ptr<logentry_vector_t> messages = msgQueue;
@@ -273,12 +275,23 @@
pthread_mutex_unlock(&msgMutex);
if (!store->handleMessages(messages)) {
- // Store could not handle these messages, nothing else to do
+ /*// Store could not handle these messages, nothing else to do
// other than to record them as being lost
LOG_OPER("[%s] WARNING: Lost %lu messages!",
categoryHandled.c_str(), messages->size());
- g_Handler->incrementCounter("lost", messages->size());
+ g_Handler->incrementCounter("lost", messages->size());*/
+ // lies; we can requeue them and set boned.
+ boned=1;
+ pthread_mutex_lock(&msgMutex);
+ for (size_t i = 0; i < msgQueue->size(); i++)
+ messages->push_back((*msgQueue)[i]);
+ msgQueue = messages;
+ msgQueueSize = 0;
+ for (size_t i = 0; i < msgQueue->size(); i++)
+ msgQueueSize += (*msgQueue)[i]->message.size();
+ pthread_mutex_unlock(&msgMutex);
}
+ else boned = 0; // write succeeded; no longer boned.
store->flush();
} else {
pthread_mutex_unlock(&msgMutex);
This looks to me like it works. I had to add the hack to FileStore
because there seems to be no code path that reopens the file in as long as
I could bo bothered to wait. I tested it by having a simple two-server
configuration (one peripheral, one central). The central server is told
to write to a 256MB ramdisk that contains a 60MB and a 192MB file, then I
delete the 60MB file once scribe starts failing to write, then I delete
the 192MB file once scribe fails to write again. I then give the
peripheral server a bunch of messages that are the positive integers in
increasing order followed by newlines, one number per message. No
messages were lost, but a few were duplicated in one chunk and the next.
I know this isn't the right way to handle the problem --- we should make
sure there's space for writing a message to disk before writing the
actually message --- but am I going to run into strange problems if I use
this patch?
(I see in trunk scribe that there's some logic for handling failed
messages. I haven't tested it, though. And at a glance, I also don't
understand why its resource consumption is bounded; I see no logic to tell
clients to try later if we're making no progress on the queue. Or why
mustSucceed should be false by default.)
|