From: <dg...@su...> - 2009-01-26 00:30:18
|
Author: Graham Cobb Date: Mon Jan 26 01:28:59 2009 New Revision: 5234 URL: http://www.opensync.org/changeset/5234 Log: Done: 4) Handle timeout before reply queue set up by generating an error message on the command queue Unit test (ipc_timeout_noreplyq) added Note: this changes behaviour: when a queue is disconnected, all pending commands (i.e. unreplied, but with a message handler) will be replied with an error response. The message handler callbacks will all be called before the call to osync_queue_disconnect returns. Still to do: 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? Modified: branches/timeout/ChangeLog branches/timeout/opensync/ipc/opensync_queue.c branches/timeout/opensync/ipc/opensync_queue_private.h branches/timeout/tests/CMakeLists.txt branches/timeout/tests/ipc-tests/check_ipc.c Modified: branches/timeout/ChangeLog ============================================================================== --- branches/timeout/ChangeLog Mon Jan 26 01:23:47 2009 (r5233) +++ branches/timeout/ChangeLog Mon Jan 26 01:28:59 2009 (r5234) @@ -1,5 +1,32 @@ +2009-01-25 Graham Cobb <g+...@co...> + + * opensync/engine/opensync_sink_engine.c (osync_sink_engine_new): Ref objengine [Bug #1052] + (osync_sink_engine_unref): Unref objengine [Bug #1052] + + * opensync/engine/opensync_obj_engine.c (_osync_obj_engine_*_callback): Unref sinkengine [Bug #1052] + (osync_obj_engine_command): Ref sinkengine every time it is used for a callback [Bug #1052] + + * opensync/ipc/opensync_queue.c (osync_queue_disconnect): Empty pending + queue before performing disconnect. Pending messages with callbacks + will get called with an error message. + (_incoming_check): Do not action incoming queue if a disconnect is in progress. + This avoids entries being added to pending queue while we are trying to empty it. + (osync_queue_send_message_with_timeout): Do not allow sending messages which require + adding entries to the reply queue pending list if the reply queue is being disconnected. + + * opensync/ipc/opensync_queue_private.h: Add disc_in_progress flag to queue. + 2009-01-24 Graham Cobb <g+...@co...> + * opensync/ipc/opensync_queue.c (_osync_queue_generate_error): Add + _osync_queue_generate_error + (_osync_send_timeout_response): If there is no reply queue, + call _osync_queue_generate_error + + * tests/CMakeLists.txt: Add ipc_timeout_noreplyq + + * tests/ipc-tests/check_ipc.c: Add ipc_timeout_noreplyq + * opensync/ipc/opensync_queue.c (osync_queue_remove_cross_link): Add osync_queue_remove_cross_link (osync_queue_disconnect): Call osync_queue_remove_cross_link Modified: branches/timeout/opensync/ipc/opensync_queue.c ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue.c Mon Jan 26 01:23:47 2009 (r5233) +++ branches/timeout/opensync/ipc/opensync_queue.c Mon Jan 26 01:28:59 2009 (r5234) @@ -175,12 +175,37 @@ if (g_async_queue_length(queue->incoming) > 0 && (queue->pendingLimit == 0 - || queue->pendingCount < queue->pendingLimit) ) + || queue->pendingCount < queue->pendingLimit) + && !queue->disc_in_progress ) return TRUE; return FALSE; } +static gboolean _osync_queue_generate_error(OSyncQueue *queue, OSyncMessageCommand errcode, OSyncError **error) +{ + OSyncMessage *message; + + queue->connected = FALSE; + + /* Now we can send the hup message, and wake up the consumer thread so + * it can pickup the messages in the incoming queue */ + message = osync_message_new(errcode, 0, error); + if (!message) { + return FALSE; + } + osync_trace(TRACE_INTERNAL, "Generating incoming error message %p(%s), id= %lli", message, osync_message_get_commandstr(message), osync_message_get_id(message)); + + osync_message_ref(message); + + g_async_queue_push(queue->incoming, message); + + if (queue->incomingContext) + g_main_context_wakeup(queue->incomingContext); + + return TRUE; +} + static void _osync_send_timeout_response(OSyncMessage *errormsg, void *user_data) { OSyncQueue *queue = user_data; @@ -191,9 +216,13 @@ osync_queue_send_message_with_timeout(queue->reply_queue, NULL, errormsg, 0, NULL); } else { osync_message_unref(errormsg); - /* TODO: as we can't send the timeout response, disconnect the queue to signal to - the sender that they aren't going to get a response. Note: we can't just - call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + /* As we can't send the timeout response, create an error and drop it in the incoming queue + so the higher layer will disconnect. Note: we can't just call osync_queue_disconnect + as that tries to kill the thread and hits a deadlock! */ + if (!_osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_ERROR, NULL)) { + osync_trace(TRACE_EXIT_ERROR, "%s: cannot even generate error on incoming queue", __func__); + return; + } osync_trace(TRACE_EXIT_ERROR, "%s: cannot find reply queue to send timeout error", __func__); return; } @@ -570,18 +599,8 @@ return TRUE; case OSYNC_QUEUE_EVENT_HUP: case OSYNC_QUEUE_EVENT_ERROR: - queue->connected = FALSE; - - /* Now we can send the hup message, and wake up the consumer thread so - * it can pickup the messages in the incoming queue */ - message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error); - if (!message) + if (!_osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_HUP, &error)) goto error; - - g_async_queue_push(queue->incoming, message); - - if (queue->incomingContext) - g_main_context_wakeup(queue->incomingContext); return FALSE; } @@ -1000,6 +1019,52 @@ osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); osync_assert(queue); + /* Before doing the real disconnect, we empty the pending queue by creating HUP + errors for anything on it. In order to make sure it empties, the disconnect + is marked as in progress so that no more entries get put on it (e.g. if a + callback tries to send another message). */ + + g_mutex_lock(queue->pendingLock); + + queue->disc_in_progress = TRUE; + + while (queue->pendingCount > 0) { + OSyncPendingMessage *pending = queue->pendingReplies->data; + OSyncError *error = NULL; + OSyncError *huperr = NULL; + OSyncMessage *errormsg = NULL; + + queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + queue->pendingCount--; + + /* Call the callback of the pending message */ + if (pending->callback) { + osync_error_set(&huperr, OSYNC_ERROR_IO_ERROR, "Disconnect."); + errormsg = osync_message_new_errorreply(NULL, huperr, &error); + osync_error_unref(&huperr); + osync_message_set_id(errormsg, pending->id); + + /* Unlock the pending lock during the callback */ + g_mutex_unlock(queue->pendingLock); + + osync_trace(TRACE_INTERNAL, "%s: Reporting disconnect error for message %lli", __func__, pending->id); + + pending->callback(errormsg, pending->user_data); + if (errormsg != NULL) + osync_message_unref(errormsg); + + /* Lock again */ + g_mutex_lock(queue->pendingLock); + } + + // TODO: Refcounting for OSyncPendingMessage + if (pending->timeout_info) + g_free(pending->timeout_info); + + osync_free(pending); + } + g_mutex_unlock(queue->pendingLock); + osync_queue_remove_cross_link(queue); g_mutex_lock(queue->disconnectLock); @@ -1038,6 +1103,10 @@ queue->fd = -1; queue->connected = FALSE; g_mutex_unlock(queue->disconnectLock); + + g_mutex_lock(queue->pendingLock); + queue->disc_in_progress = FALSE; + g_mutex_unlock(queue->pendingLock); osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; @@ -1221,6 +1290,13 @@ GTimeVal current_time; long long int id = 0; osync_assert(replyqueue); + + g_mutex_lock(replyqueue->pendingLock); + if (replyqueue->disc_in_progress) { + osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Disconnect in progress."); + goto error; + } + pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error); if (!pending) goto error; @@ -1243,7 +1319,6 @@ pending->callback = osync_message_get_handler(message); pending->user_data = osync_message_get_handler_data(message); - g_mutex_lock(replyqueue->pendingLock); replyqueue->pendingReplies = osync_list_append(replyqueue->pendingReplies, pending); replyqueue->pendingCount++; g_mutex_unlock(replyqueue->pendingLock); Modified: branches/timeout/opensync/ipc/opensync_queue_private.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_private.h Mon Jan 26 01:23:47 2009 (r5233) +++ branches/timeout/opensync/ipc/opensync_queue_private.h Mon Jan 26 01:28:59 2009 (r5234) @@ -96,6 +96,9 @@ /* Queue for receiving commands we are replying to */ OSyncQueue *cmd_queue; + + /* Disconnect in progress */ + gboolean disc_in_progress; }; Modified: branches/timeout/tests/CMakeLists.txt ============================================================================== --- branches/timeout/tests/CMakeLists.txt Mon Jan 26 01:23:47 2009 (r5233) +++ branches/timeout/tests/CMakeLists.txt Mon Jan 26 01:28:59 2009 (r5234) @@ -249,6 +249,7 @@ OSYNC_TESTCASE(ipc ipc_loop_with_timeout) OSYNC_TESTCASE(ipc ipc_late_reply) OSYNC_TESTCASE(ipc ipc_loop_timeout_with_idle) +OSYNC_TESTCASE(ipc ipc_timeout_noreplyq) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) Modified: branches/timeout/tests/ipc-tests/check_ipc.c ============================================================================== --- branches/timeout/tests/ipc-tests/check_ipc.c Mon Jan 26 01:23:47 2009 (r5233) +++ branches/timeout/tests/ipc-tests/check_ipc.c Mon Jan 26 01:28:59 2009 (r5234) @@ -2089,10 +2089,13 @@ static void _message_handler(OSyncMessage *message, void *user_data) { + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + osync_trace(TRACE_INTERNAL, "%s",osync_message_get_commandstr(message)); if (osync_message_is_error(message)) num_callback_timeout++; else num_callback++; + osync_trace(TRACE_EXIT, "%s", __func__); } char *data5 = "this is another test string"; @@ -2803,6 +2806,185 @@ } END_TEST +void client_handler6(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + + if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { + osync_queue_disconnect(client_queue, NULL); + osync_trace(TRACE_EXIT, "%s: disconnect", __func__); + return; + } + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + + /* Proper code would reply to this message, but for testing + purposes we don't reply and simulate a "timeout" situation */ + + osync_trace(TRACE_EXIT, "%s", __func__); +} +START_TEST (ipc_timeout_noreplyq) +{ + /* This testcase is inteded to test timeout before the command and reply queues are cross-linked. + Client got forked and listens for messages from Server and replies. + + To simulate a "timeout" situation the Client doesn't reply to one of the Server messages. + + As there is no reply queue, an error will be sent to the **client**, who then disconnects + so an error (although not a timeout) ends up sent to the server. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler6, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + /* Do not cross-link */ + /*osync_queue_cross_link(client_queue, server_queue);*/ + + message = osync_queue_get_message(server_queue); + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + while (!(message = osync_queue_get_message(client_queue))) { + g_usleep(10000); + } + + fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + /* Check if the timeout handler replied with an error */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + OSYNC_TESTCASE_START("ipc") OSYNC_TESTCASE_ADD(ipc_new) OSYNC_TESTCASE_ADD(ipc_ref) @@ -2832,5 +3014,6 @@ OSYNC_TESTCASE_ADD(ipc_late_reply) OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) OSYNC_TESTCASE_ADD(ipc_loop_timeout_with_idle) +OSYNC_TESTCASE_ADD(ipc_timeout_noreplyq) OSYNC_TESTCASE_END |