From: <dg...@su...> - 2009-02-01 18:59:31
|
Author: Graham Cobb Date: Sun Feb 1 19:57:43 2009 New Revision: 5251 URL: http://www.opensync.org/changeset/5251 Log: Completed: 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? And added unit test (ipc_timeout_noreceiver) The timeout changes now seem to be completed. Modified: branches/timeout/ChangeLog branches/timeout/opensync/ipc/opensync_queue.c branches/timeout/opensync/ipc/opensync_queue_private.h branches/timeout/tests/CMakeLists.txt branches/timeout/tests/ipc-tests/check_ipc.c Modified: branches/timeout/ChangeLog ============================================================================== --- branches/timeout/ChangeLog Sun Feb 1 00:00:34 2009 (r5250) +++ branches/timeout/ChangeLog Sun Feb 1 19:57:43 2009 (r5251) @@ -1,3 +1,17 @@ +2009-02-01 Graham Cobb <g+...@co...> + + * tests/ipc-tests/check_ipc.c: Add ipc_timeout_noreceiver test. + + * opensync/ipc/opensync_queue.c (osync_queue_send_message_with_timeout): Track maximum + timeout seen. Start pending queue timeout. + (_osync_queue_remove_pending_reply): restart pending queue timeout if necessary + (_osync_queue_restart_pending_timeout): Add function to start/restart pending queue timeout + (_timeout_check): Fix calculation of expiry of timeout. Add pending queue timeout. + (_timeout_dispatch): Fix calculation of expiry of timeout. Add pending queue timeout. + + * opensync/ipc/opensync_queue_private.h: Add max_timeout and pending_timeout fields (to queue). + (OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY): Add value to assume for IPC delay in pending queue timout. + 2009-01-25 Graham Cobb <g+...@co...> * opensync/engine/opensync_sink_engine.c (osync_sink_engine_new): Ref objengine [Bug #1052] Modified: branches/timeout/opensync/ipc/opensync_queue.c ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue.c Sun Feb 1 00:00:34 2009 (r5250) +++ branches/timeout/opensync/ipc/opensync_queue.c Sun Feb 1 19:57:43 2009 (r5251) @@ -29,6 +29,30 @@ #include "opensync_queue_internals.h" #include "opensync_queue_private.h" +static gboolean _osync_queue_generate_error(OSyncQueue *queue, OSyncMessageCommand errcode, OSyncError **error) +{ + OSyncMessage *message; + + queue->connected = FALSE; + + /* Now we can send the hup message, and wake up the consumer thread so + * it can pickup the messages in the incoming queue */ + message = osync_message_new(errcode, 0, error); + if (!message) { + return FALSE; + } + osync_trace(TRACE_INTERNAL, "Generating incoming error message %p(%s), id= %lli", message, osync_message_get_commandstr(message), osync_message_get_id(message)); + + osync_message_ref(message); + + g_async_queue_push(queue->incoming, message); + + if (queue->incomingContext) + g_main_context_wakeup(queue->incomingContext); + + return TRUE; +} + static gboolean _timeout_prepare(GSource *source, gint *timeout_) { /* TODO adapt *timeout_ value to shortest message timeout value... @@ -55,6 +79,16 @@ * list since another thread might be duing the updates */ g_mutex_lock(queue->pendingLock); + /* First check the overall queue timer */ + if (queue->pendingCount> 0 && queue->pending_timeout.tv_sec > 0) { + if (current_time.tv_sec > queue->pending_timeout.tv_sec + || (current_time.tv_sec == queue->pending_timeout.tv_sec + && current_time.tv_usec >= queue->pending_timeout.tv_usec) ) { + g_mutex_unlock(queue->pendingLock); + return TRUE; + } + } + for (p = queue->pendingReplies; p; p = p->next) { pending = p->data; @@ -63,7 +97,7 @@ toinfo = pending->timeout_info; - if (current_time.tv_sec >= toinfo->expiration.tv_sec + if (current_time.tv_sec > toinfo->expiration.tv_sec || (current_time.tv_sec == toinfo->expiration.tv_sec && current_time.tv_usec >= toinfo->expiration.tv_usec)) { /* Unlock the pending lock since the messages might be sent during the callback */ @@ -96,6 +130,18 @@ * list since another thread might be duing the updates */ g_mutex_lock(queue->pendingLock); + /* First check the overall queue timer */ + if (queue->pendingCount> 0 && queue->pending_timeout.tv_sec > 0) { + if (current_time.tv_sec > queue->pending_timeout.tv_sec + || (current_time.tv_sec == queue->pending_timeout.tv_sec + && current_time.tv_usec >= queue->pending_timeout.tv_usec) ) { + /* The queue has died. Generate an error */ + osync_trace(TRACE_INTERNAL, "%s: Pending queue timer expired: receiver must have died", __func__); + _osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_ERROR, NULL); + queue->pending_timeout.tv_sec = 0; // Stop timer + } + } + for (p = queue->pendingReplies; p; p = p->next) { pending = p->data; @@ -104,8 +150,8 @@ toinfo = pending->timeout_info; - if (current_time.tv_sec == toinfo->expiration.tv_sec || - (current_time.tv_sec >= toinfo->expiration.tv_sec + if (current_time.tv_sec > toinfo->expiration.tv_sec || + (current_time.tv_sec == toinfo->expiration.tv_sec && current_time.tv_usec >= toinfo->expiration.tv_usec)) { OSyncError *error = NULL; OSyncError *timeouterr = NULL; @@ -182,30 +228,6 @@ return FALSE; } -static gboolean _osync_queue_generate_error(OSyncQueue *queue, OSyncMessageCommand errcode, OSyncError **error) -{ - OSyncMessage *message; - - queue->connected = FALSE; - - /* Now we can send the hup message, and wake up the consumer thread so - * it can pickup the messages in the incoming queue */ - message = osync_message_new(errcode, 0, error); - if (!message) { - return FALSE; - } - osync_trace(TRACE_INTERNAL, "Generating incoming error message %p(%s), id= %lli", message, osync_message_get_commandstr(message), osync_message_get_id(message)); - - osync_message_ref(message); - - g_async_queue_push(queue->incoming, message); - - if (queue->incomingContext) - g_main_context_wakeup(queue->incomingContext); - - return TRUE; -} - static void _osync_send_timeout_response(OSyncMessage *errormsg, void *user_data) { OSyncQueue *queue = user_data; @@ -229,6 +251,28 @@ osync_trace(TRACE_EXIT, "%s", __func__); } +/* Restart the pending queue timeout */ +static void _osync_queue_restart_pending_timeout(OSyncQueue *queue) +{ + /* Note that the pending queue timeout is just to make sure that progress is being made. + It does not time individual commands -- it just makes sure that responses are being + received for outstanding commands at some rate. Individual message timeouts are + handled at the receiver. The main purpose of this timer is to detect if the + receiver has stopped receiving for some reason. + + The timer is started when the first message is put on the pending queue and is reset + and restarted whenever a response is received. The timeout value is based on the + largest message timeout seen to date. */ + + /* Note: queue->pending_timout is protected by the pending lock, which should be held + by the caller before calling this function */ + + if (queue->max_timeout) { + g_source_get_current_time(queue->timeout_source, &queue->pending_timeout); + queue->pending_timeout.tv_sec += queue->max_timeout + OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY; + } +} + /* Find and remove a pending message that this message is a reply to */ static void _osync_queue_remove_pending_reply(OSyncQueue *queue, OSyncMessage *reply, gboolean callback) { @@ -255,7 +299,8 @@ gets called twice! */ queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); - queue->pendingCount--; + if (--queue->pendingCount != 0) + _osync_queue_restart_pending_timeout(queue); /* Unlock the pending lock since the messages might be sent during the callback */ g_mutex_unlock(queue->pendingLock); @@ -300,7 +345,7 @@ osync_message_get_timeout(message), osync_message_get_id(message)); if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { - /* Remove pending reply and call callback */ + /* Remove pending reply and call callback*/ _osync_queue_remove_pending_reply(queue, message, TRUE); } else { unsigned int timeout = osync_message_get_timeout(message); @@ -1312,6 +1357,9 @@ if (timeout) { /* Send timeout info to other end to handle */ osync_message_set_timeout(message, timeout); + /* Note largest timeout seen */ + if (timeout > replyqueue->max_timeout) + replyqueue->max_timeout = timeout; } else { osync_trace(TRACE_INTERNAL, "handler message got sent without timeout!: %s", osync_message_get_commandstr(message)); } @@ -1320,7 +1368,10 @@ pending->user_data = osync_message_get_handler_data(message); replyqueue->pendingReplies = osync_list_append(replyqueue->pendingReplies, pending); - replyqueue->pendingCount++; + if (replyqueue->pendingCount++ == 0) { + /* Start queue timeout */ + _osync_queue_restart_pending_timeout(replyqueue); + } g_mutex_unlock(replyqueue->pendingLock); } Modified: branches/timeout/opensync/ipc/opensync_queue_private.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_private.h Sun Feb 1 00:00:34 2009 (r5250) +++ branches/timeout/opensync/ipc/opensync_queue_private.h Sun Feb 1 19:57:43 2009 (r5251) @@ -99,8 +99,18 @@ /* Disconnect in progress */ gboolean disc_in_progress; + + /* Largest timeout value seen so far */ + unsigned int max_timeout; + + /* Expiration time of pending queue timeout */ + GTimeVal pending_timeout; }; +/** @brief Pending queue timeout addition for ipc delay + */ +#define OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY 1 + /** @brief Timeout object */ Modified: branches/timeout/tests/CMakeLists.txt ============================================================================== --- branches/timeout/tests/CMakeLists.txt Sun Feb 1 00:00:34 2009 (r5250) +++ branches/timeout/tests/CMakeLists.txt Sun Feb 1 19:57:43 2009 (r5251) @@ -250,6 +250,7 @@ OSYNC_TESTCASE(ipc ipc_late_reply) OSYNC_TESTCASE(ipc ipc_loop_timeout_with_idle) OSYNC_TESTCASE(ipc ipc_timeout_noreplyq) +OSYNC_TESTCASE(ipc ipc_timeout_noreceiver) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) Modified: branches/timeout/tests/ipc-tests/check_ipc.c ============================================================================== --- branches/timeout/tests/ipc-tests/check_ipc.c Sun Feb 1 00:00:34 2009 (r5250) +++ branches/timeout/tests/ipc-tests/check_ipc.c Sun Feb 1 19:57:43 2009 (r5251) @@ -2985,6 +2985,141 @@ } END_TEST +START_TEST (ipc_timeout_noreceiver) +{ + /* This testcase is intended to test the case where the receiver is not even listening, + and so does not run the timeout. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler1, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + + /* Do not start receiver */ + /* osync_queue_setup_with_gmainloop(client_queue, context); */ + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + /* Do not cross-link */ + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + g_usleep(4*G_USEC_PER_SEC); + + /* Check if the timeout handler replied with an error. + Note: it is important we check **before** we start disconnecting + otherwise we are not testing the right thing */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + OSYNC_TESTCASE_START("ipc") OSYNC_TESTCASE_ADD(ipc_new) OSYNC_TESTCASE_ADD(ipc_ref) @@ -3015,5 +3150,6 @@ OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) OSYNC_TESTCASE_ADD(ipc_loop_timeout_with_idle) OSYNC_TESTCASE_ADD(ipc_timeout_noreplyq) +OSYNC_TESTCASE_ADD(ipc_timeout_noreceiver) OSYNC_TESTCASE_END |