From: <dg...@su...> - 2009-02-03 19:08:54
|
Author: Graham Cobb Date: Tue Feb 3 20:07:09 2009 New Revision: 5254 URL: http://www.opensync.org/changeset/5254 Log: Merge of basic timeout changes Modified: trunk/ChangeLog trunk/opensync/client/opensync_client.c trunk/opensync/client/opensync_client_proxy.c trunk/opensync/ipc/opensync_message.c trunk/opensync/ipc/opensync_message_internals.h trunk/opensync/ipc/opensync_message_private.h trunk/opensync/ipc/opensync_queue.c trunk/opensync/ipc/opensync_queue_internals.h trunk/opensync/ipc/opensync_queue_private.h trunk/tests/CMakeLists.txt trunk/tests/ipc-tests/check_ipc.c Modified: trunk/ChangeLog ============================================================================== --- trunk/ChangeLog Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/ChangeLog Tue Feb 3 20:07:09 2009 (r5254) @@ -1,3 +1,57 @@ +2009-01-22 Graham Cobb <g+...@co...> + + * tests/CMakeLists.txt: Add ipc_loop_with_timeout + Add ipc_late_reply + + * tests/ipc-tests/check_ipc.c: Call osync_queue_cross_link every time + client mainloop is setup. + (ipc_timeout): Rewrite client to use a mainloop and a callback so new + timeout processing can run + Rewrite server to use a mainloop so message callback gets called + (ipc_loop_with_timeout): Add ipc_loop_with_timeout + (ipc_late_reply): Add ipc_late_reply + + * opensync/ipc/opensync_queue.c (_timeout_dispatch): Restart timeout search at start of + pending list as list may have been modified while unlocked. + (_timeout_dispatch): Set message id in error message + (_timeout_dispatch): Undo change to restart timeout search (above) + +2009-01-21 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_queue.c (osync_queue_send_message_with_timeout): If this is + a reply, remove pending command + + * opensync/client/opensync_client_proxy.c (_osync_client_proxy_hup_handler): treat + a queue error as a disconnect + (osync_client_proxy_spawn): cross-link command and reply queues + + * opensync/client/opensync_client.c (_osync_client_handle_initialize): cross-link + command and reply queues + (_osync_client_hup_handler): treat a queue error as a disconnect + +2009-01-20 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_queue_internals.h: Add osync_queue_cross_link + + * opensync/ipc/opensync_queue.c (osync_queue_cross_link): Cross-link command and reply queues + + * opensync/ipc/opensync_queue.c (_osync_send_timeout_response): + Callback function for timeouts: send the timeout error reponse + (_incoming_dispatch): For incoming messages which have timeouts, + create a pending list entry + + * opensync/ipc/opensync_queue_private.h: Add reply_queue for pointer to + queue used for replies to incoming messages, if any. + Add cmd_queue for pointer to queue used to store pending commands. + +2009-01-19 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_message.c: Add opensync_message_set/get_timeout + + * opensync/ipc/opensync_message_internals.h: Add opensync_message_set/get_timeout + + * opensync/ipc/opensync_message_private.h: Add timeout to message + 2008-05-04 Graham Cobb <g+...@co...> * opensync/helper/opensync_hashtable.c (osync_hashtable_get_deleted): Modified: trunk/opensync/client/opensync_client.c ============================================================================== --- trunk/opensync/client/opensync_client.c Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/client/opensync_client.c Tue Feb 3 20:07:09 2009 (r5254) @@ -604,6 +604,7 @@ } osync_client_set_outgoing_queue(client, outgoing); + osync_queue_cross_link(client->incoming, client->outgoing); osync_queue_unref(outgoing); osync_trace(TRACE_INTERNAL, "done connecting to engine"); } @@ -1519,9 +1520,8 @@ osync_trace(TRACE_INTERNAL, "plugin received command %i on sending queue", osync_message_get_command(message)); - if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { - /* Houston, we have a problem */ - } else if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) { + if ( (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) + || (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) ) { /* The remote side disconnected. So we can now disconnect as well and then * shutdown */ if (!osync_queue_disconnect(client->outgoing, &error)) Modified: trunk/opensync/client/opensync_client_proxy.c ============================================================================== --- trunk/opensync/client/opensync_client_proxy.c Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/client/opensync_client_proxy.c Tue Feb 3 20:07:09 2009 (r5254) @@ -227,9 +227,8 @@ osync_trace(TRACE_INTERNAL, "client received command %i on sending queue", osync_message_get_command(message)); - if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { - /* Houston, we have a problem */ - } else if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) { + if ( (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) /* Treat an error as a disconnect */ + || (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) ) { /* The remote side disconnected. So we can now disconnect as well and then * shutdown */ if (!osync_queue_disconnect(proxy->outgoing, &error)) @@ -916,6 +915,7 @@ osync_client_set_incoming_queue(proxy->client, read1); osync_client_set_outgoing_queue(proxy->client, write2); + osync_queue_cross_link(read1, write2); if (!osync_client_run(proxy->client, error)) goto error_free_pipe2; Modified: trunk/opensync/ipc/opensync_message.c ============================================================================== --- trunk/opensync/ipc/opensync_message.c Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_message.c Tue Feb 3 20:07:09 2009 (r5254) @@ -74,6 +74,18 @@ return message->cmd; } +void osync_message_set_timeout(OSyncMessage *message, unsigned int timeout) +{ + osync_assert(message); + message->timeout = timeout; +} + +unsigned int osync_message_get_timeout(OSyncMessage *message) +{ + osync_assert(message); + return message->timeout; +} + void osync_message_set_id(OSyncMessage *message, long long int id) { osync_assert(message); Modified: trunk/opensync/ipc/opensync_message_internals.h ============================================================================== --- trunk/opensync/ipc/opensync_message_internals.h Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_message_internals.h Tue Feb 3 20:07:09 2009 (r5254) @@ -193,6 +193,21 @@ */ OSYNC_TEST_EXPORT void osync_message_get_buffer(OSyncMessage *message, char **data, unsigned int *size); +/** @brief Set timeout (in seconds) for the message object + * + * @param message The message to modify + * @param timeout The new message timeout + * + */ +OSYNC_TEST_EXPORT void osync_message_set_timeout(OSyncMessage *message, unsigned int timeout); + +/** @brief Get timeout (in seconds) for the message object + * + * @param message The message + * + */ +OSYNC_TEST_EXPORT unsigned int osync_message_get_timeout(OSyncMessage *message); + /** @brief Sets the handler that will receive the reply * * @param message The message to work on Modified: trunk/opensync/ipc/opensync_message_private.h ============================================================================== --- trunk/opensync/ipc/opensync_message_private.h Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_message_private.h Tue Feb 3 20:07:09 2009 (r5254) @@ -45,7 +45,7 @@ /** The user data */ gpointer user_data; /** The timeout associated with this message */ - //timeout_info *to_info; + unsigned int timeout; /** If this message has already been answered */ osync_bool is_answered; /** The internal OSyncMarshal object **/ Modified: trunk/opensync/ipc/opensync_queue.c ============================================================================== --- trunk/opensync/ipc/opensync_queue.c Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_queue.c Tue Feb 3 20:07:09 2009 (r5254) @@ -2,6 +2,7 @@ * libosengine - A synchronization engine for the opensync framework * Copyright (C) 2004-2005 Armin Bauer <arm...@op...> * Copyright (C) 2007 Daniel Friedrich <dan...@op...> + * Copyright (C) 2009 Graham R. Cobb <g+o...@co...> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -110,11 +111,15 @@ OSyncError *timeouterr = NULL; OSyncMessage *errormsg = NULL; + osync_trace(TRACE_INTERNAL, "Timeout for message with id %lli", pending->id); + + /* Call the callback of the pending message */ osync_assert(pending->callback); osync_error_set(&timeouterr, OSYNC_ERROR_IO_ERROR, "Timeout."); errormsg = osync_message_new_errorreply(NULL, timeouterr, &error); osync_error_unref(&timeouterr); + osync_message_set_id(errormsg, pending->id); /* Remove first the pending message! To avoid that _incoming_dispatch catchs this message @@ -137,6 +142,9 @@ /* Lock again, to keep the iteration of the pendingReplies list atomic. */ g_mutex_lock(queue->pendingLock); + /* The queue may have been modified while it was unlocked so don't go + looking for any more entries. If there are more, they will be found + on the next call */ break; } } @@ -161,67 +169,146 @@ return FALSE; } +static void _osync_send_timeout_response(OSyncMessage *errormsg, void *user_data) +{ + OSyncQueue *queue = user_data; + + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, errormsg, queue); + + if (queue->reply_queue) { + osync_queue_send_message_with_timeout(queue->reply_queue, NULL, errormsg, 0, NULL); + } else { + osync_message_unref(errormsg); + /* TODO: as we can't send the timeout response, disconnect the queue to signal to + the sender that they aren't going to get a response. Note: we can't just + call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + osync_trace(TRACE_EXIT_ERROR, "%s: cannot find reply queue to send timeout error", __func__); + return; + } + osync_trace(TRACE_EXIT, "%s", __func__); +} + +/* Find and remove a pending message that this message is a reply to */ +static void _osync_queue_remove_pending_reply(OSyncQueue *queue, OSyncMessage *reply, gboolean callback) +{ + OSyncPendingMessage *pending = NULL; + OSyncList *p = NULL; + + osync_trace(TRACE_ENTRY, "%s(%p, %p, %d)", __func__, queue, reply, callback); + osync_trace(TRACE_INTERNAL, "Searching for pending message id=%lli", osync_message_get_id(reply)); + + /* Search for the pending reply. We have to lock the + * list since another thread might be duing the updates */ + g_mutex_lock(queue->pendingLock); + + for (p = queue->pendingReplies; p; p = p->next) { + pending = p->data; + + if (pending->id == osync_message_get_id(reply)) { + + osync_trace(TRACE_INTERNAL, "Found pending message id=%lli: %p", osync_message_get_id(reply), pending); + /* Remove first the pending message! + To avoid that _timeout_dispatch catchs this message + when we're releasing the lock. If _timeout_dispatch + would catch this message, the pending callback + gets called twice! */ + + queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + + /* Unlock the pending lock since the messages might be sent during the callback */ + g_mutex_unlock(queue->pendingLock); + + if (callback) { + /* Call the callback of the pending message */ + osync_assert(pending->callback); + pending->callback(reply, pending->user_data); + } + + // TODO: Refcounting for OSyncPendingMessage + if (pending->timeout_info) + g_free(pending->timeout_info); + osync_free(pending); + + osync_trace(TRACE_EXIT, "%s", __func__); + return; + } + } + + g_mutex_unlock(queue->pendingLock); + osync_trace(TRACE_EXIT, "%s", __func__); +} + /* This function is called from the master thread. The function dispatched incoming data from * the remote end */ static gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { - OSyncPendingMessage *pending = NULL; OSyncQueue *queue = user_data; - OSyncList *p = NULL; OSyncMessage *message = NULL; + OSyncError *error = NULL; osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data); while ((message = g_async_queue_try_pop(queue->incoming))) { /* We check if the message is a reply to something */ - osync_trace(TRACE_INTERNAL, "Dispatching %p:%i(%s)", message, osync_message_get_cmd(message), osync_message_get_commandstr(message)); + osync_trace(TRACE_INTERNAL, "Dispatching %p:%i(%s), timeout=%d, id=%lli", message, + osync_message_get_cmd(message), osync_message_get_commandstr(message), + osync_message_get_timeout(message), osync_message_get_id(message)); if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { - - /* Search for the pending reply. We have to lock the - * list since another thread might be duing the updates */ - g_mutex_lock(queue->pendingLock); - - for (p = queue->pendingReplies; p; p = p->next) { - pending = p->data; - - if (pending->id == osync_message_get_id(message)) { + /* Remove pending reply and call callback */ + _osync_queue_remove_pending_reply(queue, message, TRUE); + } else { + unsigned int timeout = osync_message_get_timeout(message); + if (timeout) { + /* This message has a timeout. Put message on pending list and run timeout */ + /* NOTE: We really want to put it on the pending list for the corresponding + outgoing queue but there may not be one at this time (if this is an Initialise, + for example). So we put it on the pending list of the incoming queue and deal with + finding the outgoing queue if the timeout fires. This also complicates removing + pending items when sending the responses. Oh well. */ + + OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), &error); + if (!pending) + goto error; - /* Remove first the pending message! - To avoid that _timeout_dispatch catchs this message - when we're releasing the lock. If _timeout_dispatch - would catch this message, the pending callback - gets called twice! */ + pending->id = osync_message_get_id(message); - queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + OSyncTimeoutInfo *toinfo = osync_try_malloc0(sizeof(OSyncTimeoutInfo), &error); + if (!toinfo) + goto error; - /* Unlock the pending lock since the messages might be sent during the callback */ - g_mutex_unlock(queue->pendingLock); - - /* Call the callback of the pending message */ - osync_assert(pending->callback); - pending->callback(message, pending->user_data); - - // TODO: Refcounting for OSyncPendingMessage - if (pending->timeout_info) - g_free(pending->timeout_info); - osync_free(pending); - - /* Lock again, to keep the iteration of the pendingReplies list atomic. */ - g_mutex_lock(queue->pendingLock); - break; - } + GTimeVal current_time; + g_source_get_current_time(queue->timeout_source, ¤t_time); + + toinfo->expiration = current_time; + toinfo->expiration.tv_sec += timeout; + + pending->timeout_info = toinfo; + + pending->callback = _osync_send_timeout_response; + pending->user_data = queue; + + g_mutex_lock(queue->pendingLock); + queue->pendingReplies = osync_list_append(queue->pendingReplies, pending); + g_mutex_unlock(queue->pendingLock); } - - g_mutex_unlock(queue->pendingLock); - } else + queue->message_handler(message, queue->user_data); + } osync_message_unref(message); } osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__); return TRUE; + + error: + /* TODO: as we can't send the timeout response, disconnect the queue to signal to + the sender that they aren't going to get a response. Note: we can't just + call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + + osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error)); + return TRUE; } static void _osync_queue_stop_incoming(OSyncQueue *queue) @@ -328,6 +415,10 @@ if (!_osync_queue_write_long_long_int(queue, osync_message_get_id(message), &error)) goto error; + /* The timeout (integer) */ + if (!_osync_queue_write_int(queue, (int) osync_message_get_timeout(message), &error)) + goto error; + osync_message_get_buffer(message, &data, &length); if (length) { @@ -498,7 +589,7 @@ do { int size = 0; - int cmd = 0; + int cmd = 0, timeout = 0; long long int id = 0; char *buffer = NULL; @@ -514,11 +605,16 @@ if (!_osync_queue_read_long_long_int(queue, &id, &error)) goto error; + /* The timeout */ + if (!_osync_queue_read_int(queue, &timeout, &error)) + goto error; + message = osync_message_new(cmd, size, &error); if (!message) goto error; osync_message_set_id(message, id); + osync_message_set_timeout(message, (unsigned int) timeout); /* We now get the buffer from the message which will already * have the correct size for the read */ @@ -933,7 +1029,7 @@ void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data) { - osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data); + osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, handler, user_data); queue->message_handler = handler; queue->user_data = user_data; @@ -941,6 +1037,22 @@ osync_trace(TRACE_EXIT, "%s", __func__); } +void osync_queue_cross_link(OSyncQueue *cmd_queue, OSyncQueue *reply_queue) +{ + /* Cross-linking is needed to make timeouts work when commands are + being received on one queue and replies sent on another. + Note that cross-linking is not needed when commands are being sent. */ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, cmd_queue, reply_queue); + + osync_assert(cmd_queue->type == OSYNC_QUEUE_RECEIVER); + osync_assert(reply_queue->type == OSYNC_QUEUE_SENDER); + + cmd_queue->reply_queue = reply_queue; + reply_queue->cmd_queue = cmd_queue; + + osync_trace(TRACE_EXIT, "%s", __func__); +} + void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context) { OSyncQueue **queueptr = NULL; @@ -1035,6 +1147,19 @@ { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %u, %p)", __func__, queue, replyqueue, message, timeout, error); + if (queue->cmd_queue) { + /* This queue is a reply queue for some command receiving queue */ + + /* If this is actually a reply message, we check to see if there is + message to remove from the command queue's pending list */ + + if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { + /* Remove pending reply but do not call callback + (real callback will be called by the receiver of this reply later) */ + _osync_queue_remove_pending_reply(queue->cmd_queue, message, FALSE); + } + } + if (osync_message_get_handler(message)) { OSyncPendingMessage *pending = NULL; GTimeVal current_time; @@ -1053,14 +1178,8 @@ osync_trace(TRACE_INTERNAL, "Setting id %lli for pending reply", id); if (timeout) { - OSyncTimeoutInfo *toinfo = osync_try_malloc0(sizeof(OSyncTimeoutInfo), error); - if (!toinfo) - goto error; - - toinfo->expiration = current_time; - toinfo->expiration.tv_sec += timeout; - - pending->timeout_info = toinfo; + /* Send timeout info to other end to handle */ + osync_message_set_timeout(message, timeout); } else { osync_trace(TRACE_INTERNAL, "handler message got sent without timeout!: %s", osync_message_get_commandstr(message)); } @@ -1118,6 +1237,7 @@ { osync_assert(queue); return queue->name; + } int osync_queue_get_fd(OSyncQueue *queue) Modified: trunk/opensync/ipc/opensync_queue_internals.h ============================================================================== --- trunk/opensync/ipc/opensync_queue_internals.h Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_queue_internals.h Tue Feb 3 20:07:09 2009 (r5254) @@ -85,6 +85,20 @@ OSYNC_TEST_EXPORT void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data); /** + * @brief Cross links command queue and reply queue + * + * Stores the queue used for replies in the command queue object so + * that timeout responses can be sent if necessary. + * And stores the command queue in the reply queue object so that + * replies can remove pending messages before they time out. + * + * @param cmd_queue The command queue used to receive incoming commands + * @param reply_queue The queue used to send replies + * + */ +OSYNC_TEST_EXPORT void osync_queue_cross_link(OSyncQueue *cmd_queue, OSyncQueue *reply_queue); + +/** * @brief Sends a Message to a Queue * @param queue Pointer to the queue * @param replyqueue Modified: trunk/opensync/ipc/opensync_queue_private.h ============================================================================== --- trunk/opensync/ipc/opensync_queue_private.h Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/opensync/ipc/opensync_queue_private.h Tue Feb 3 20:07:09 2009 (r5254) @@ -89,6 +89,12 @@ /** Reference count **/ int ref_count; + + /* Queue for replies to incoming messages */ + OSyncQueue *reply_queue; + + /* Queue for receiving commands we are replying to */ + OSyncQueue *cmd_queue; }; Modified: trunk/tests/CMakeLists.txt ============================================================================== --- trunk/tests/CMakeLists.txt Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/tests/CMakeLists.txt Tue Feb 3 20:07:09 2009 (r5254) @@ -246,6 +246,8 @@ OSYNC_TESTCASE(ipc ipc_pipes_stress) OSYNC_TESTCASE_DISABLED(ipc ipc_callback_break_pipes "1039") OSYNC_TESTCASE(ipc ipc_timeout) +OSYNC_TESTCASE(ipc ipc_loop_with_timeout) +OSYNC_TESTCASE(ipc ipc_late_reply) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) Modified: trunk/tests/ipc-tests/check_ipc.c ============================================================================== --- trunk/tests/ipc-tests/check_ipc.c Tue Feb 3 19:17:50 2009 (r5253) +++ trunk/tests/ipc-tests/check_ipc.c Tue Feb 3 20:07:09 2009 (r5254) @@ -1087,6 +1087,8 @@ osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1272,6 +1274,8 @@ osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1450,6 +1454,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); @@ -1644,6 +1650,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); while (osync_queue_is_connected(client_queue)) { g_usleep(100); } @@ -1849,6 +1857,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); message = osync_queue_get_message(server_queue); if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { @@ -1984,6 +1994,8 @@ osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); while (osync_queue_is_connected(client_queue)) { g_usleep(100); } @@ -2081,11 +2093,39 @@ num_callback_timeout++; else num_callback++; +} +char *data5 = "this is another test string"; +void client_handler5(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); OSyncError *error = NULL; - OSyncMessage *reply = osync_message_new_reply(message, &error); - osync_queue_send_message(client_queue, NULL, reply, &error); - osync_message_unref(reply); + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; + + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + + /* Proper code would reply to this message, but for testing + purposes we don't reply and simulate a "timeout" situation */ + + osync_trace(TRACE_EXIT, "%s", __func__); } START_TEST (ipc_timeout) @@ -2116,78 +2156,245 @@ osync_queue_create(client_queue, &error); fail_unless(error == NULL, NULL); - char *data = "this is another test string"; pid_t cpid = fork(); if (cpid == 0) { //Child - g_usleep(1*G_USEC_PER_SEC); + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler5, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); osync_assert(error == NULL); osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); - while (!(message = osync_queue_get_message(client_queue))) { - g_usleep(10000); - } + message = osync_queue_get_message(server_queue); - if (osync_message_get_command(message) != OSYNC_MESSAGE_INITIALIZE) { + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { exit (1); } - int int1; - long long int longint1; - char *string; - char databuf[strlen(data) + 1]; - - osync_message_read_int(message, &int1); - osync_message_read_string(message, &string); - osync_message_read_long_long_int(message, &longint1); - osync_message_read_data(message, databuf, strlen(data) + 1); + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); - osync_assert(int1 == 4000000); - osync_assert(!strcmp(string, "this is a test string")); + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); - osync_assert(longint1 == 400000000); - osync_assert(!strcmp(databuf, "this is another test string")); + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); - /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); - /* Proper code would reply to this message, but for testing - purposes we don't reply and simulate a "timeout" situation */ - + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + // Block + g_usleep(5*G_USEC_PER_SEC); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + while (!(message = osync_queue_get_message(client_queue))) { g_usleep(10000); } + + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { + exit (1); + } + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); - OSyncMessage *reply = osync_message_new_reply(message, &error); + /* Check if the timeout handler replied with an error */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); - osync_queue_send_message(server_queue, NULL, reply, &error); + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST +void client_handler6(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); - if (osync_queue_disconnect(client_queue, &error) != TRUE || error != NULL) - exit(1); - osync_queue_unref(client_queue); + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; - while (!(message = osync_queue_get_message(server_queue))) { - g_usleep(10000); - } + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + // Do some time consuming processing + g_usleep(3*G_USEC_PER_SEC); + + OSyncMessage *reply = osync_message_new_reply(message, &error); + + osync_queue_send_message(server_queue, NULL, reply, &error); + + osync_message_unref(reply); + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +START_TEST (ipc_late_reply) +{ + /* This testcase is inteded to test osync_queue_send_message_with_timeout(). + Client got forked and listens for messages from Server and replies. + + To simulate a "timeout" situation the Client delays 3 seconds and then replies + + The timeout handler will call the _message_handler() with an error and the late reply + will be discarded. + + JFYI, every timed out message calls the callback/message_handler with an (timeout) error. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler6, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { exit (1); } - - osync_message_unref(message); - g_usleep(1*G_USEC_PER_SEC); + osync_message_unref(message); + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) exit(1); osync_queue_unref(server_queue); + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + g_free(testbed); exit(0); } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); fail_unless(error == NULL, NULL); @@ -2203,7 +2410,7 @@ osync_message_write_int(message, 4000000); osync_message_write_string(message, "this is a test string"); osync_message_write_long_long_int(message, 400000000); - osync_message_write_data(message, data, strlen(data) + 1); + osync_message_write_data(message, data5, strlen(data5) + 1); // Send with timeout of one second fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); @@ -2212,13 +2419,7 @@ osync_message_unref(message); // Block - while (!(message = osync_queue_get_message(server_queue))) { - g_usleep(100000); - } - - fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_REPLY); - - osync_message_unref(message); + g_usleep(5*G_USEC_PER_SEC); osync_queue_disconnect(server_queue, &error); fail_unless(error == NULL, NULL); @@ -2259,6 +2460,199 @@ } END_TEST +void callback_handler7(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) == 1); + + num_msgs++; + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_REPLY); + + if (num_msgs >= req_msgs) { + osync_queue_disconnect(server_queue, &error); + osync_assert(error == NULL); + } + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +void server_handler7(OSyncMessage *message, void *user_data) +{ + abort(); +} + +void client_handler7(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + void *databuf; + + osync_message_read_int(message, &int1); + osync_message_read_const_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_const_data(message, &databuf, strlen("this is another test string") + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, "this is another test string")); + + // Do some time consuming processing + g_usleep(1*G_USEC_PER_SEC); + + OSyncMessage *reply = osync_message_new_reply(message, &error); + + osync_queue_send_message(server_queue, NULL, reply, &error); + + osync_message_unref(reply); + + osync_trace(TRACE_EXIT, "%s", __func__); +} + +START_TEST (ipc_loop_with_timeout) +{ + + /* Even though each action takes 1 second, none of these messages should time out + as they are being sent with a timeout of 3 seconds */ + + num_msgs = 0; + req_msgs = 20; + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + char *data = "this is another test string"; + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler7, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); + + if (osync_message_get_command(message) != OSYNC_MESSAGE_QUEUE_HUP) { + exit (1); + } + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler7, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + int i = 0; + for (i = 0; i < req_msgs; i++) { + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data, strlen(data) + 1); + + osync_message_set_handler(message, callback_handler7, GINT_TO_POINTER(1)); + + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 3, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + } + + message = osync_queue_get_message(client_queue); + + fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + OSYNC_TESTCASE_START("ipc") OSYNC_TESTCASE_ADD(ipc_new) OSYNC_TESTCASE_ADD(ipc_ref) @@ -2285,5 +2679,7 @@ OSYNC_TESTCASE_ADD(ipc_callback_break_pipes) OSYNC_TESTCASE_ADD(ipc_timeout) +OSYNC_TESTCASE_ADD(ipc_late_reply) +OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) OSYNC_TESTCASE_END |