From: <dg...@su...> - 2009-01-24 00:31:08
|
Author: Graham Cobb Date: Sat Jan 24 01:29:49 2009 New Revision: 5227 URL: http://www.opensync.org/changeset/5227 Log: Completed: 2) Limit the number of pending entries so the timeout is not dependent on the number of outstanding requests And added unit test. That leaves (at least): 3) Add references for cross-linked pointers between queues and remove linkages (and unref) when ??? (queue disconnected?) 4) Handle timeout before reply queue set up by generating an error message on the command queue 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? Modified: branches/timeout/ChangeLog branches/timeout/opensync/client/opensync_client.c branches/timeout/opensync/ipc/opensync_queue.c branches/timeout/opensync/ipc/opensync_queue_internals.h branches/timeout/opensync/ipc/opensync_queue_private.h branches/timeout/tests/CMakeLists.txt branches/timeout/tests/ipc-tests/check_ipc.c Modified: branches/timeout/ChangeLog ============================================================================== --- branches/timeout/ChangeLog Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/ChangeLog Sat Jan 24 01:29:49 2009 (r5227) @@ -1,3 +1,26 @@ +2009-01-23 Graham Cobb <g+...@co...> + + * opensync/client/opensync_client.c (osync_client_set_incoming_queue): Call + osync_queue_set_pending_limit + + * tests/ipc-tests/check_ipc.c: Call osync_queue_set_pending_limit in timeout tests. + + * opensync/ipc/opensync_queue_internals.h: Add osync_queue_set_pending_limit + and OSYNC_QUEUE_PENDING_LIMIT + + * opensync/ipc/opensync_queue.c (_incoming_check): Check pendingLimit not exceeded. + Add increments/decrements of pendingCount whenever pendingReplies is manipulated. + (osync_queue_set_pending_limit): Add osync_queue_set_pending_limit + + * opensync/ipc/opensync_queue_private.h: Add pendingCount, pendingLimit to OSyncQueue + + * tests/CMakeLists.txt: Add ipc_loop_timeout_with_idle. + + * tests/ipc-tests/check_ipc.c: Rename callback_handler as callback_handler_check_reply. + Rename server_handler3 as server_handler_abort. + Rename client_handler6 as client_handler_sleep. + Add ipc_loop_timeout_with_idle. + 2009-01-22 Graham Cobb <g+...@co...> * tests/CMakeLists.txt: Add ipc_loop_with_timeout Modified: branches/timeout/opensync/client/opensync_client.c ============================================================================== --- branches/timeout/opensync/client/opensync_client.c Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/opensync/client/opensync_client.c Sat Jan 24 01:29:49 2009 (r5227) @@ -1611,6 +1611,7 @@ osync_queue_set_message_handler(incoming, _osync_client_message_handler, client); osync_queue_setup_with_gmainloop(incoming, client->context); client->incoming = osync_queue_ref(incoming); + osync_queue_set_pending_limit(incoming, OSYNC_QUEUE_PENDING_LIMIT); } void osync_client_set_outgoing_queue(OSyncClient *client, OSyncQueue *outgoing) Modified: branches/timeout/opensync/ipc/opensync_queue.c ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue.c Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/opensync/ipc/opensync_queue.c Sat Jan 24 01:29:49 2009 (r5227) @@ -128,6 +128,7 @@ gets called twice! */ queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + queue->pendingCount--; /* Unlock the pending lock since the messages might be sent during the callback */ g_mutex_unlock(queue->pendingLock); @@ -163,7 +164,18 @@ static gboolean _incoming_check(GSource *source) { OSyncQueue *queue = *((OSyncQueue **)(source + 1)); - if (g_async_queue_length(queue->incoming) > 0) + + /* As well as checking there is something on the incoming queue, we check + that we are not blocked because we have too many pending commands. + This check is only done if the pendingLimit has been set. + + Note, to avoid taking out the pending lock in order to count + the length of the pending queue we use the pendingCount counter. + This better not get out of line! */ + + if (g_async_queue_length(queue->incoming) > 0 + && (queue->pendingLimit == 0 + || queue->pendingCount < queue->pendingLimit) ) return TRUE; return FALSE; @@ -214,6 +226,7 @@ gets called twice! */ queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + queue->pendingCount--; /* Unlock the pending lock since the messages might be sent during the callback */ g_mutex_unlock(queue->pendingLock); @@ -247,8 +260,11 @@ OSyncError *error = NULL; osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data); + osync_trace(TRACE_INTERNAL, "queue->pendingCount = %d, queue->pendingLimit = %d", queue->pendingCount, queue->pendingLimit); + + while ( (queue->pendingLimit == 0 || queue->pendingCount < queue->pendingLimit) + && (message = g_async_queue_try_pop(queue->incoming)) ) { - while ((message = g_async_queue_try_pop(queue->incoming))) { /* We check if the message is a reply to something */ osync_trace(TRACE_INTERNAL, "Dispatching %p:%i(%s), timeout=%d, id=%lli", message, osync_message_get_cmd(message), osync_message_get_commandstr(message), @@ -290,6 +306,7 @@ g_mutex_lock(queue->pendingLock); queue->pendingReplies = osync_list_append(queue->pendingReplies, pending); + queue->pendingCount++; g_mutex_unlock(queue->pendingLock); } @@ -807,6 +824,7 @@ pending = queue->pendingReplies->data; queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + queue->pendingCount--; /** @todo Refcounting for OSyncPendingMessage */ if (pending->timeout_info) @@ -815,6 +833,8 @@ osync_free(pending); } + osync_assert(queue->pendingCount == 0); + if (queue->name) osync_free(queue->name); @@ -1053,6 +1073,20 @@ osync_trace(TRACE_EXIT, "%s", __func__); } +void osync_queue_set_pending_limit(OSyncQueue *queue, unsigned int limit) +{ + /* The pending limit is used on queues which receive commands and run timeouts. + It is used to limit the number of pending transactions so timeouts don't occur + just because there are a lot of commands waiting to complete. + It is not necessary on other queues and, in fact, MUST NOT be set on + queues which receive command replies as it could cause deadlocks. */ + osync_trace(TRACE_ENTRY, "%s(%p, %u)", __func__, queue, limit); + + queue->pendingLimit = limit; + + osync_trace(TRACE_EXIT, "%s", __func__); +} + void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context) { OSyncQueue **queueptr = NULL; @@ -1189,6 +1223,7 @@ g_mutex_lock(replyqueue->pendingLock); replyqueue->pendingReplies = osync_list_append(replyqueue->pendingReplies, pending); + replyqueue->pendingCount++; g_mutex_unlock(replyqueue->pendingLock); } Modified: branches/timeout/opensync/ipc/opensync_queue_internals.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_internals.h Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/opensync/ipc/opensync_queue_internals.h Sat Jan 24 01:29:49 2009 (r5227) @@ -99,6 +99,26 @@ OSYNC_TEST_EXPORT void osync_queue_cross_link(OSyncQueue *cmd_queue, OSyncQueue *reply_queue); /** + * @brief Set pending limit on queue + * + * This should be used on queues used to receive incoming commands to + * limit the number of outstanding commands waiting for replies. + * This avoids timing out commands which are just waiting for previous + * commands to finish. + * + * Note: the pending limit should not be set on other queues and + * MUST NOT be set on command reply queues as it could cause deadlocks. + * + * The recommended value to use is OSYNC_QUEUE_PENDING_LIMIT. + * + * @param queue The command queue used to receive incoming commands + * @param limit The maximum number of waiting commands + * + */ +OSYNC_TEST_EXPORT void osync_queue_set_pending_limit(OSyncQueue *queue, unsigned int limit); +#define OSYNC_QUEUE_PENDING_LIMIT 5 + +/** * @brief Sends a Message to a Queue * @param queue Pointer to the queue * @param replyqueue Modified: branches/timeout/opensync/ipc/opensync_queue_private.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_private.h Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/opensync/ipc/opensync_queue_private.h Sat Jan 24 01:29:49 2009 (r5227) @@ -71,6 +71,7 @@ /** List of pending replies */ OSyncList *pendingReplies; GMutex *pendingLock; + unsigned int pendingCount, pendingLimit; GSourceFuncs *write_functions; GSource *write_source; Modified: branches/timeout/tests/CMakeLists.txt ============================================================================== --- branches/timeout/tests/CMakeLists.txt Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/tests/CMakeLists.txt Sat Jan 24 01:29:49 2009 (r5227) @@ -248,6 +248,7 @@ OSYNC_TESTCASE(ipc ipc_timeout) OSYNC_TESTCASE(ipc ipc_loop_with_timeout) OSYNC_TESTCASE(ipc ipc_late_reply) +OSYNC_TESTCASE(ipc ipc_loop_timeout_with_idle) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) Modified: branches/timeout/tests/ipc-tests/check_ipc.c ============================================================================== --- branches/timeout/tests/ipc-tests/check_ipc.c Fri Jan 23 15:07:37 2009 (r5226) +++ branches/timeout/tests/ipc-tests/check_ipc.c Sat Jan 24 01:29:49 2009 (r5227) @@ -1362,7 +1362,7 @@ } END_TEST -void callback_handler(OSyncMessage *message, void *user_data) +void callback_handler_check_reply(OSyncMessage *message, void *user_data) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); OSyncError *error = NULL; @@ -1380,7 +1380,7 @@ osync_trace(TRACE_EXIT, "%s", __func__); } -void server_handler3(OSyncMessage *message, void *user_data) +void server_handler_abort(OSyncMessage *message, void *user_data) { abort(); } @@ -1484,7 +1484,7 @@ GMainContext *context = g_main_context_new(); OSyncThread *thread = osync_thread_new(context, &error); - osync_queue_set_message_handler(server_queue, server_handler3, GINT_TO_POINTER(1)); + osync_queue_set_message_handler(server_queue, server_handler_abort, GINT_TO_POINTER(1)); osync_queue_setup_with_gmainloop(server_queue, context); @@ -1507,7 +1507,7 @@ osync_message_write_long_long_int(message, 400000000); osync_message_write_data(message, data, strlen(data) + 1); - osync_message_set_handler(message, callback_handler, GINT_TO_POINTER(1)); + osync_message_set_handler(message, callback_handler_check_reply, GINT_TO_POINTER(1)); fail_unless(osync_queue_send_message(client_queue, server_queue, message, &error), NULL); fail_unless(!osync_error_is_set(&error), NULL); @@ -2164,6 +2164,7 @@ OSyncThread *thread = osync_thread_new(context, &error); osync_queue_set_message_handler(client_queue, client_handler5, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); osync_queue_setup_with_gmainloop(client_queue, context); @@ -2275,7 +2276,8 @@ } END_TEST -void client_handler6(OSyncMessage *message, void *user_data) +int ch_sleep_time = 3; // Seconds +void client_handler_sleep(OSyncMessage *message, void *user_data) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); OSyncError *error = NULL; @@ -2300,7 +2302,7 @@ osync_assert(!strcmp(databuf, data5)); // Do some time consuming processing - g_usleep(3*G_USEC_PER_SEC); + g_usleep(ch_sleep_time*G_USEC_PER_SEC); OSyncMessage *reply = osync_message_new_reply(message, &error); @@ -2330,7 +2332,8 @@ num_callback_timeout = 0; num_callback = 0; - + ch_sleep_time = 3; + OSyncError *error = NULL; server_queue = osync_queue_new("/tmp/testpipe-server", &error); client_queue = osync_queue_new("/tmp/testpipe-client", &error); @@ -2348,7 +2351,8 @@ GMainContext *context = g_main_context_new(); OSyncThread *thread = osync_thread_new(context, &error); - osync_queue_set_message_handler(client_queue, client_handler6, GINT_TO_POINTER(1)); + osync_queue_set_message_handler(client_queue, client_handler_sleep, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); osync_queue_setup_with_gmainloop(client_queue, context); @@ -2460,72 +2464,211 @@ } END_TEST -void callback_handler7(OSyncMessage *message, void *user_data) +START_TEST (ipc_loop_with_timeout) { - osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + + /* Even though each action takes 1 second, none of these messages should time out + as they are being sent with a timeout of 3 seconds */ + + num_msgs = 0; + req_msgs = 20; + ch_sleep_time = 1; // Second + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; - osync_assert(GPOINTER_TO_INT(user_data) == 1); + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); - num_msgs++; - osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_REPLY); + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + char *data = "this is another test string"; - if (num_msgs >= req_msgs) { - osync_queue_disconnect(server_queue, &error); + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler_sleep, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); + + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { + exit (1); + } + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler_abort, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + int i = 0; + for (i = 0; i < req_msgs; i++) { + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data, strlen(data) + 1); + + osync_message_set_handler(message, callback_handler_check_reply, GINT_TO_POINTER(1)); + + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 3, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + } + + message = osync_queue_get_message(client_queue); + + fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); } - osync_trace(TRACE_EXIT, "%s", __func__); -} + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); -void server_handler7(OSyncMessage *message, void *user_data) -{ - abort(); + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); } +END_TEST -void client_handler7(OSyncMessage *message, void *user_data) +GSList *ch_pending = NULL; +void client_handler_first_part(OSyncMessage *message, void *user_data) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); OSyncError *error = NULL; osync_assert(GPOINTER_TO_INT(user_data) ==1); osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); - + int int1; long long int longint1; char *string; - void *databuf; + char databuf[strlen(data5) + 1]; + osync_message_read_int(message, &int1); - osync_message_read_const_string(message, &string); + osync_message_read_string(message, &string); osync_message_read_long_long_int(message, &longint1); - osync_message_read_const_data(message, &databuf, strlen("this is another test string") + 1); + osync_message_read_data(message, databuf, strlen(data5) + 1); osync_assert(int1 == 4000000); osync_assert(!strcmp(string, "this is a test string")); osync_assert(longint1 == 400000000); - osync_assert(!strcmp(databuf, "this is another test string")); + osync_assert(!strcmp(databuf, data5)); - // Do some time consuming processing - g_usleep(1*G_USEC_PER_SEC); - - OSyncMessage *reply = osync_message_new_reply(message, &error); - - osync_queue_send_message(server_queue, NULL, reply, &error); + // Put message on pending queue and return + osync_message_ref(message); + ch_pending = g_slist_append(ch_pending, message); + + osync_trace(TRACE_EXIT, "%s", __func__); +} +gboolean client_handler_second_part(gpointer userdata) +{ + OSyncError *error = NULL; + + osync_trace(TRACE_ENTRY, "%s(%p)", __func__, userdata); + + if (ch_pending) { + + OSyncMessage *message = ch_pending->data; + + OSyncMessage *reply = osync_message_new_reply(message, &error); - osync_message_unref(reply); + osync_queue_send_message(server_queue, NULL, reply, &error); - osync_trace(TRACE_EXIT, "%s", __func__); + osync_message_unref(reply); + + ch_pending = g_slist_remove(ch_pending, message); + osync_message_unref(message); + + osync_trace(TRACE_EXIT, "%s", __func__); + return TRUE; + } + + osync_trace(TRACE_EXIT, "%s: no more entries", __func__); + return FALSE; } -START_TEST (ipc_loop_with_timeout) +START_TEST (ipc_loop_timeout_with_idle) { - /* Even though each action takes 1 second, none of these messages should time out + /* Same as ipc_loop_with_timeout except that the client handler doesn't sleep, + so the queue dispatchers can run while the operation is waiting. + Even though each action takes 1 second, none of these messages should time out as they are being sent with a timeout of 3 seconds */ num_msgs = 0; - req_msgs = 20; + req_msgs = 30; char *testbed = setup_testbed(NULL); osync_testing_file_remove("/tmp/testpipe-server"); @@ -2549,7 +2692,8 @@ GMainContext *context = g_main_context_new(); OSyncThread *thread = osync_thread_new(context, &error); - osync_queue_set_message_handler(client_queue, client_handler7, GINT_TO_POINTER(1)); + osync_queue_set_message_handler(client_queue, client_handler_first_part, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); osync_queue_setup_with_gmainloop(client_queue, context); @@ -2562,6 +2706,12 @@ osync_assert(error == NULL); osync_queue_cross_link(client_queue, server_queue); + + GSource *tsource = g_timeout_source_new(1000); + osync_assert(tsource); + g_source_set_callback(tsource, client_handler_second_part, NULL, NULL); + osync_assert(g_source_attach(tsource, context)); + g_source_unref(tsource); message = osync_queue_get_message(server_queue); @@ -2590,7 +2740,7 @@ GMainContext *context = g_main_context_new(); OSyncThread *thread = osync_thread_new(context, &error); - osync_queue_set_message_handler(server_queue, server_handler7, GINT_TO_POINTER(1)); + osync_queue_set_message_handler(server_queue, server_handler_abort, GINT_TO_POINTER(1)); osync_queue_setup_with_gmainloop(server_queue, context); @@ -2613,9 +2763,9 @@ osync_message_write_long_long_int(message, 400000000); osync_message_write_data(message, data, strlen(data) + 1); - osync_message_set_handler(message, callback_handler7, GINT_TO_POINTER(1)); + osync_message_set_handler(message, callback_handler_check_reply, GINT_TO_POINTER(1)); - fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 3, &error), NULL); + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 10, &error), NULL); fail_unless(!osync_error_is_set(&error), NULL); osync_message_unref(message); @@ -2681,5 +2831,6 @@ OSYNC_TESTCASE_ADD(ipc_timeout) OSYNC_TESTCASE_ADD(ipc_late_reply) OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) +OSYNC_TESTCASE_ADD(ipc_loop_timeout_with_idle) OSYNC_TESTCASE_END |