From: <svn...@op...> - 2009-02-25 13:58:34
|
Author: friedrich.beckmann Date: Wed Feb 25 14:58:27 2009 New Revision: 5273 URL: http://www.opensync.org/changeset/5273 Log: Added thread based communication for the queues. This avoids the usage of the file pipe mechanism. Messages are simple forwarded directly between g_async_queues. This is required for the windows port and is usefull also on unix as the communication is faster (no file i/o). It is automatically enabled, when the start type of the plugin is THREAD. Modified: trunk/opensync/client/opensync_client_proxy.c trunk/opensync/engine/opensync_engine.c trunk/opensync/ipc/opensync_queue.c trunk/opensync/ipc/opensync_queue_internals.h trunk/opensync/ipc/opensync_queue_private.h trunk/tests/CMakeLists.txt trunk/tests/client-tests/check_client.c Modified: trunk/opensync/client/opensync_client_proxy.c ============================================================================== --- trunk/opensync/client/opensync_client_proxy.c Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/opensync/client/opensync_client_proxy.c Wed Feb 25 14:58:27 2009 (r5273) @@ -887,19 +887,17 @@ proxy->type = type; if (type != OSYNC_START_TYPE_EXTERNAL) { - // First, create the pipe from the engine to the client - if (!osync_queue_new_pipes(&read1, &write1, error)) - goto error; - - // Then the pipe from the client to the engine - if (!osync_queue_new_pipes(&read2, &write2, error)) - goto error_free_pipe1; - - proxy->outgoing = osync_queue_ref(write1); - proxy->incoming = osync_queue_ref(read2); - /* Now we either spawn a new process, or we create a new thread */ if (type == OSYNC_START_TYPE_THREAD) { + // First, create the pipe from the engine to the client + if (!osync_queue_new_threadcom(&read1, &write1, error)) + goto error; + // Then the pipe from the client to the engine + if (!osync_queue_new_threadcom(&read2, &write2, error)) + goto error_free_pipe1; + proxy->outgoing = osync_queue_ref(write1); + proxy->incoming = osync_queue_ref(read2); + proxy->client = osync_client_new(error); if (!proxy->client) goto error_free_pipe2; @@ -920,6 +918,13 @@ if (!osync_client_run(proxy->client, error)) goto error_free_pipe2; } else { + if (!osync_queue_new_pipes(&read1, &write1, error)) + goto error; + if (!osync_queue_new_pipes(&read2, &write2, error)) + goto error_free_pipe1; + proxy->outgoing = osync_queue_ref(write1); + proxy->incoming = osync_queue_ref(read2); + /* First lets see if the old plugin exists, and kill it if it does */ //if (!_osync_client_kill_old_osplugin(proxy, error)) // goto error; Modified: trunk/opensync/engine/opensync_engine.c ============================================================================== --- trunk/opensync/engine/opensync_engine.c Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/opensync/engine/opensync_engine.c Wed Feb 25 14:58:27 2009 (r5273) @@ -690,7 +690,7 @@ static osync_bool _osync_engine_finalize_member(OSyncEngine *engine, OSyncClientProxy *proxy, OSyncError **error) { - unsigned int i = 2000; + unsigned int i = 1000; osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, proxy, error); engine->busy = TRUE; @@ -699,7 +699,13 @@ goto error; //FIXME - while (engine->busy && i > 0) { g_usleep(1000); g_main_context_iteration(engine->context, FALSE); i--; } + osync_trace(TRACE_INTERNAL, "Start waiting for engine"); +// while (engine->busy && i > 0) { g_usleep(1000); g_main_context_iteration(engine->context, FALSE); i--; } + while (engine->busy && i > 0) { g_usleep(1000); i--; } + + if (engine->busy) + osync_trace(TRACE_INTERNAL, "Engine still busy - I give up waiting"); + osync_trace(TRACE_INTERNAL, "Done waiting"); if (!osync_client_proxy_shutdown(proxy, error)) Modified: trunk/opensync/ipc/opensync_queue.c ============================================================================== --- trunk/opensync/ipc/opensync_queue.c Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/opensync/ipc/opensync_queue.c Wed Feb 25 14:58:27 2009 (r5273) @@ -497,6 +497,13 @@ osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected"); goto error; } + + /* When using threadec communication, the message is directly passed to the */ + /* incoming asynch queue of the connected queue */ + if (queue->usethreadcom){ + g_async_queue_push(queue->connected_queue->incoming, message); + continue; + } /* The size of the message */ if (!_osync_queue_write_int(queue, osync_message_get_message_size(message), &error)) @@ -640,6 +647,12 @@ return FALSE; } + + if (queue->usethreadcom){ + if (queue->connection_closing) + _osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_HUP, &error); + return FALSE; + } switch (osync_queue_poll(queue)) { case OSYNC_QUEUE_EVENT_NONE: @@ -794,6 +807,10 @@ queue->ref_count = 1; + queue->usethreadcom = FALSE; + queue->connected_queue = NULL; + queue->connection_closing = FALSE; + osync_trace(TRACE_EXIT, "%s: %p", __func__, queue); return queue; @@ -821,6 +838,32 @@ return NULL; } +osync_bool osync_queue_new_threadcom(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error){ + + osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error); + *read_queue = osync_queue_new(NULL, error); + if (!*read_queue) + goto error; + *write_queue = osync_queue_new(NULL, error); + if (!*write_queue) + goto error_free_read_queue; + + (*read_queue)->usethreadcom = TRUE; + (*write_queue)->usethreadcom = TRUE; + (*read_queue)->connected_queue = *write_queue; + (*write_queue)->connected_queue = *read_queue; + + osync_trace(TRACE_EXIT, "%s", __func__); + return TRUE; + + error_free_read_queue: + osync_queue_unref(*read_queue); + error: + osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); + return FALSE; + +} + osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error) { #ifdef _WIN32 @@ -905,6 +948,9 @@ if (queue->name) osync_free(queue->name); + + if (queue->connected_queue) + queue->connected_queue = NULL; osync_free(queue); queue = NULL; @@ -963,38 +1009,44 @@ osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error) { -#ifdef _WIN32 - return FALSE; -#else //_WIN32 OSyncQueue **queueptr = NULL; osync_trace(TRACE_ENTRY, "%s(%p, %i, %p)", __func__, queue, type, error); osync_assert(queue); osync_assert(queue->connected == FALSE); queue->type = type; - - if (queue->fd == -1) { - /* First, open the queue with the flags provided by the user */ - int fd = open(queue->name, type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY); - if (fd == -1) { - osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo"); - goto error; + + if (!queue->usethreadcom){ +#ifdef _WIN32 + osync_error_set(error, OSYNC_ERROR_GENERIC, "Windows does not support pipes"); + goto error; +#else + if (queue->fd == -1) { + /* First, open the queue with the flags provided by the user */ + int fd = open(queue->name, type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY); + if (fd == -1) { + osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo"); + goto error; + } + queue->fd = fd; } - queue->fd = fd; - } - int oldflags = fcntl(queue->fd, F_GETFD); - if (oldflags == -1) { - osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags"); - goto error_close; - } - if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) { - osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags"); - goto error_close; + int oldflags = fcntl(queue->fd, F_GETFD); + if (oldflags == -1) { + osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags"); + goto error_close; + } + if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) { + osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags"); + goto error_close; + } +#endif /*_WIN32*/ } - queue->connected = TRUE; + queue->connection_closing = FALSE; +#ifndef _WIN32 signal(SIGPIPE, SIG_IGN); +#endif /* now we start a thread which handles reading/writing of the queue */ queue->thread = osync_thread_new(queue->context, error); @@ -1056,11 +1108,13 @@ return TRUE; error_close: - close(queue->fd); +#ifndef _WIN32 + if (!queue->usethreadcom) + close(queue->fd); +#endif error: osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return FALSE; -#endif //_WIN32 } osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error) @@ -1142,11 +1196,15 @@ /* We have to empty the incoming queue if we disconnect the queue. Otherwise, the * consumer threads might try to pick up messages even after we are done. */ _osync_queue_flush_messages(queue->incoming); - - if (queue->fd != -1 && close(queue->fd) != 0) { - osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue"); - osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); - return FALSE; + + if (queue->usethreadcom){ + queue->connected_queue->connection_closing = TRUE; + }else{ + if (queue->fd != -1 && close(queue->fd) != 0) { + osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue"); + osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); + return FALSE; + } } queue->fd = -1; Modified: trunk/opensync/ipc/opensync_queue_internals.h ============================================================================== --- trunk/opensync/ipc/opensync_queue_internals.h Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/opensync/ipc/opensync_queue_internals.h Wed Feb 25 14:58:27 2009 (r5273) @@ -51,6 +51,11 @@ OSYNC_TEST_EXPORT osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error); /** + * @brief Creates two queues which are connected via thread communication + */ +OSYNC_TEST_EXPORT osync_bool osync_queue_new_threadcom(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error); + +/** * @brief Removes a Queue * @param queue OpenSync Queue that should be removed * @param error Modified: trunk/opensync/ipc/opensync_queue_private.h ============================================================================== --- trunk/opensync/ipc/opensync_queue_private.h Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/opensync/ipc/opensync_queue_private.h Wed Feb 25 14:58:27 2009 (r5273) @@ -105,6 +105,14 @@ /* Expiration time of pending queue timeout */ GTimeVal pending_timeout; + + /* Threaded communication */ + gboolean usethreadcom; + /* Connected queue */ + OSyncQueue *connected_queue; + + /* Indicate that the connection is closing for threaded com */ + gboolean connection_closing; }; /** @brief Pending queue timeout addition for ipc delay Modified: trunk/tests/CMakeLists.txt ============================================================================== --- trunk/tests/CMakeLists.txt Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/tests/CMakeLists.txt Wed Feb 25 14:58:27 2009 (r5273) @@ -52,8 +52,13 @@ BUILD_CHECK_TEST(client client-tests/check_client.c ${TEST_TARGET_LIBRARIES} ) OSYNC_TESTCASE(client client_new) +IF (NOT WIN32) OSYNC_TESTCASE(client client_pipes) OSYNC_TESTCASE(client client_run) +ENDIF (NOT WIN32) +OSYNC_TESTCASE(client client_threadcom) +OSYNC_TESTCASE(client client_run_threadcom) + BUILD_CHECK_TEST( filter sync-tests/check_filter.c ${TEST_TARGET_LIBRARIES} ) OSYNC_TESTCASE(filter filter_setup) Modified: trunk/tests/client-tests/check_client.c ============================================================================== --- trunk/tests/client-tests/check_client.c Wed Feb 25 14:31:43 2009 (r5272) +++ trunk/tests/client-tests/check_client.c Wed Feb 25 14:58:27 2009 (r5273) @@ -100,6 +100,83 @@ } END_TEST +START_TEST (client_threadcom) +{ + char *testbed = setup_testbed(NULL); + + OSyncError *error = NULL; + OSyncQueue *read1 = NULL; + OSyncQueue *write1 = NULL; + OSyncQueue *read2 = NULL; + OSyncQueue *write2 = NULL; + OSyncClient *client = osync_client_new(&error); + fail_unless(client != NULL, NULL); + fail_unless(error == NULL, NULL); + + osync_assert(osync_queue_new_threadcom(&read1, &write1, &error)); + osync_assert(error == NULL); + + fail_unless(osync_queue_connect(read1, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(write1, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + osync_assert(osync_queue_new_threadcom(&read2, &write2, &error)); + osync_assert(error == NULL); + + fail_unless(osync_queue_connect(read2, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(write2, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + osync_client_set_incoming_queue(client, read1); + osync_client_set_outgoing_queue(client, write2); + + OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_queue_send_message(write1, NULL, message, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(read1, &error)); + osync_assert(error == NULL); + + message = osync_queue_get_message(write1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(write1, &error)); + osync_assert(error == NULL); + + + + + osync_assert(osync_queue_disconnect(read2, &error)); + osync_assert(error == NULL); + + message = osync_queue_get_message(write2); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(write2, &error)); + osync_assert(error == NULL); + + osync_queue_unref(read2); + osync_queue_unref(write1); + osync_queue_unref(read1); + osync_queue_unref(write2); + + osync_client_unref(client); + + destroy_testbed(testbed); +} +END_TEST + + START_TEST (client_run) { char *testbed = setup_testbed(NULL); @@ -187,9 +264,99 @@ } END_TEST +START_TEST (client_run_threadcom) +{ + char *testbed = setup_testbed(NULL); + + OSyncError *error = NULL; + OSyncQueue *read1 = NULL; + OSyncQueue *write1 = NULL; + OSyncQueue *read2 = NULL; + OSyncQueue *write2 = NULL; + OSyncClient *client = osync_client_new(&error); + fail_unless(client != NULL, NULL); + fail_unless(error == NULL, NULL); + + osync_assert(osync_queue_new_threadcom(&read1, &write1, &error)); + osync_assert(error == NULL); + + fail_unless(osync_queue_connect(read1, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(write1, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + osync_assert(osync_queue_new_threadcom(&read2, &write2, &error)); + osync_assert(error == NULL); + + fail_unless(osync_queue_connect(read2, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(write2, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + osync_client_set_incoming_queue(client, read1); + osync_client_set_outgoing_queue(client, write2); + + OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_queue_send_message(write1, NULL, message, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + osync_message_unref(message); + + OSyncError *locerror = NULL; + osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "test"); + fail_unless(osync_client_run(client, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + osync_client_error_shutdown(client, locerror); + osync_error_unref(&locerror); + + message = osync_queue_get_message(read2); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_ERROR); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(read1, &error)); + osync_assert(error == NULL); + + message = osync_queue_get_message(write1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(write1, &error)); + osync_assert(error == NULL); + + + + + osync_assert(osync_queue_disconnect(read2, &error)); + osync_assert(error == NULL); + + message = osync_queue_get_message(write2); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_assert(osync_queue_disconnect(write2, &error)); + osync_assert(error == NULL); + + osync_queue_unref(read2); + osync_queue_unref(write1); + osync_queue_unref(read1); + osync_queue_unref(write2); + + osync_client_unref(client); + + destroy_testbed(testbed); +} +END_TEST + + OSYNC_TESTCASE_START("client") OSYNC_TESTCASE_ADD(client_new) OSYNC_TESTCASE_ADD(client_pipes) +OSYNC_TESTCASE_ADD(client_threadcom) OSYNC_TESTCASE_ADD(client_run) +OSYNC_TESTCASE_ADD(client_run_threadcom) OSYNC_TESTCASE_END |