From: <dg...@su...> - 2009-01-22 23:52:30
|
Author: Graham Cobb Date: Fri Jan 23 00:51:13 2009 New Revision: 5221 URL: http://www.opensync.org/changeset/5221 Log: Unit tests for the new timeout code. Tasks completed: 1) Created unit tests. All tests pass. 5) Timeouts that occur while a plugin thread is sleeping work fine, but thread is not interrupted. Nothing more planned. This leaves (at least) the following tasks: 2) Limit the number of pending entries so the timeout is not dependent on the number of outstanding requests 3) Add references for cross-linked pointers between queues and remove linkages (and unref) when ??? (queue disconnected?) 4) Handle timeout before reply queue set up by generating an error message on the command queue 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? Modified: branches/timeout/ChangeLog branches/timeout/tests/CMakeLists.txt branches/timeout/tests/ipc-tests/check_ipc.c Modified: branches/timeout/ChangeLog ============================================================================== --- branches/timeout/ChangeLog Thu Jan 22 23:01:49 2009 (r5220) +++ branches/timeout/ChangeLog Fri Jan 23 00:51:13 2009 (r5221) @@ -1,5 +1,16 @@ 2009-01-22 Graham Cobb <g+...@co...> + * tests/CMakeLists.txt: Add ipc_loop_with_timeout + Add ipc_late_reply + + * tests/ipc-tests/check_ipc.c: Call osync_queue_cross_link every time + client mainloop is setup. + (ipc_timeout): Rewrite client to use a mainloop and a callback so new + timeout processing can run + Rewrite server to use a mainloop so message callback gets called + (ipc_loop_with_timeout): Add ipc_loop_with_timeout + (ipc_late_reply): Add ipc_late_reply + * opensync/ipc/opensync_queue.c (_timeout_dispatch): Restart timeout search at start of pending list as list may have been modified while unlocked. (_timeout_dispatch): Set message id in error message Modified: branches/timeout/tests/CMakeLists.txt ============================================================================== --- branches/timeout/tests/CMakeLists.txt Thu Jan 22 23:01:49 2009 (r5220) +++ branches/timeout/tests/CMakeLists.txt Fri Jan 23 00:51:13 2009 (r5221) @@ -246,6 +246,8 @@ OSYNC_TESTCASE(ipc ipc_pipes_stress) OSYNC_TESTCASE_DISABLED(ipc ipc_callback_break_pipes "1039") OSYNC_TESTCASE(ipc ipc_timeout) +OSYNC_TESTCASE(ipc ipc_loop_with_timeout) +OSYNC_TESTCASE(ipc ipc_late_reply) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) Modified: branches/timeout/tests/ipc-tests/check_ipc.c ============================================================================== --- branches/timeout/tests/ipc-tests/check_ipc.c Thu Jan 22 23:01:49 2009 (r5220) +++ branches/timeout/tests/ipc-tests/check_ipc.c Fri Jan 23 00:51:13 2009 (r5221) @@ -1087,6 +1087,8 @@ osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1272,6 +1274,8 @@ osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1450,6 +1454,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1644,6 +1650,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); while (osync_queue_is_connected(client_queue)) { g_usleep(100); } @@ -1849,6 +1857,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { @@ -1984,6 +1994,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); while (osync_queue_is_connected(client_queue)) { g_usleep(100); } @@ -2081,11 +2093,39 @@ num_callback_timeout++; else num_callback++; +} +char *data5 = "this is another test string"; +void client_handler5(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); OSyncError *error = NULL; - OSyncMessage *reply = osync_message_new_reply(message, &error); - osync_queue_send_message(client_queue, NULL, reply, &error); - osync_message_unref(reply); + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; + + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + + /* Proper code would reply to this message, but for testing + purposes we don't reply and simulate a "timeout" situation */ + + osync_trace(TRACE_EXIT, "%s", __func__); } START_TEST (ipc_timeout) @@ -2116,78 +2156,245 @@ osync_queue_create(client_queue, &error); fail_unless(error == NULL, NULL); - char *data = "this is another test string"; pid_t cpid = fork(); if (cpid == 0) { //Child - g_usleep(1*G_USEC_PER_SEC); + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler5, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); - while (!(message = osync_queue_get_message(client_queue))) { - g_usleep(10000); - } + message = osync_queue_get_message(server_queue); - if (osync_message_get_command(message) != OSYNC_MESSAGE_INITIALIZE) { + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { exit (1); } - int int1; - long long int longint1; - char *string; - char databuf[strlen(data) + 1]; - - osync_message_read_int(message, &int1); - osync_message_read_string(message, &string); - osync_message_read_long_long_int(message, &longint1); - osync_message_read_data(message, databuf, strlen(data) + 1); + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); - osync_assert(int1 == 4000000); - osync_assert(!strcmp(string, "this is a test string")); + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); - osync_assert(longint1 == 400000000); - osync_assert(!strcmp(databuf, "this is another test string")); + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); - /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); - /* Proper code would reply to this message, but for testing - purposes we don't reply and simulate a "timeout" situation */ - + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + // Block + g_usleep(5*G_USEC_PER_SEC); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + while (!(message = osync_queue_get_message(client_queue))) { g_usleep(10000); } + + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { + exit (1); + } + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); - OSyncMessage *reply = osync_message_new_reply(message, &error); + /* Check if the timeout handler replied with an error */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); - osync_queue_send_message(server_queue, NULL, reply, &error); + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST +void client_handler6(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); - if (osync_queue_disconnect(client_queue, &error) != TRUE || error != NULL) - exit(1); - osync_queue_unref(client_queue); + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; - while (!(message = osync_queue_get_message(server_queue))) { - g_usleep(10000); - } + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + // Do some time consuming processing + g_usleep(3*G_USEC_PER_SEC); + + OSyncMessage *reply = osync_message_new_reply(message, &error); + + osync_queue_send_message(server_queue, NULL, reply, &error); + + osync_message_unref(reply); + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +START_TEST (ipc_late_reply) +{ + /* This testcase is inteded to test osync_queue_send_message_with_timeout(). + Client got forked and listens for messages from Server and replies. + + To simulate a "timeout" situation the Client delays 3 seconds and then replies + + The timeout handler will call the _message_handler() with an error and the late reply + will be discarded. + + JFYI, every timed out message calls the callback/message_handler with an (timeout) error. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler6, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { exit (1); } - - osync_message_unref(message); - g_usleep(1*G_USEC_PER_SEC); + osync_message_unref(message); + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) exit(1); osync_queue_unref(server_queue); + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + g_free(testbed); exit(0); } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); fail_unless(error == NULL, NULL); @@ -2203,7 +2410,7 @@ osync_message_write_int(message, 4000000); osync_message_write_string(message, "this is a test string"); osync_message_write_long_long_int(message, 400000000); - osync_message_write_data(message, data, strlen(data) + 1); + osync_message_write_data(message, data5, strlen(data5) + 1); // Send with timeout of one second fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); @@ -2212,13 +2419,7 @@ osync_message_unref(message); // Block - while (!(message = osync_queue_get_message(server_queue))) { - g_usleep(100000); - } - - fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_REPLY); - - osync_message_unref(message); + g_usleep(5*G_USEC_PER_SEC); osync_queue_disconnect(server_queue, &error); fail_unless(error == NULL, NULL); @@ -2259,6 +2460,199 @@ } END_TEST +void callback_handler7(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) == 1); + + num_msgs++; + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_REPLY); + + if (num_msgs >= req_msgs) { + osync_queue_disconnect(server_queue, &error); + osync_assert(error == NULL); + } + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +void server_handler7(OSyncMessage *message, void *user_data) +{ + abort(); +} + +void client_handler7(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + void *databuf; + + osync_message_read_int(message, &int1); + osync_message_read_const_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_const_data(message, &databuf, strlen("this is another test string") + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, "this is another test string")); + + // Do some time consuming processing + g_usleep(1*G_USEC_PER_SEC); + + OSyncMessage *reply = osync_message_new_reply(message, &error); + + osync_queue_send_message(server_queue, NULL, reply, &error); + + osync_message_unref(reply); + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +START_TEST (ipc_loop_with_timeout) +{ + + /* Even though each action takes 1 second, none of these messages should time out + as they are being sent with a timeout of 3 seconds */ + + num_msgs = 0; + req_msgs = 20; + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + char *data = "this is another test string"; + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler7, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); + + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { + exit (1); + } + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler7, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + int i = 0; + for (i = 0; i < req_msgs; i++) { + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data, strlen(data) + 1); + + osync_message_set_handler(message, callback_handler7, GINT_TO_POINTER(1)); + + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 3, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + } + + message = osync_queue_get_message(client_queue); + + fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + OSYNC_TESTCASE_START("ipc") OSYNC_TESTCASE_ADD(ipc_new) OSYNC_TESTCASE_ADD(ipc_ref) @@ -2285,5 +2679,7 @@ OSYNC_TESTCASE_ADD(ipc_callback_break_pipes) OSYNC_TESTCASE_ADD(ipc_timeout) +OSYNC_TESTCASE_ADD(ipc_late_reply) +OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) OSYNC_TESTCASE_END |