From: <dg...@su...> - 2009-01-22 20:52:21
|
Author: Graham Cobb Date: Thu Jan 22 21:51:07 2009 New Revision: 5216 URL: http://www.opensync.org/changeset/5216 Log: Initial work for new timeout handling. OSyncQueue handling is reworked so that timeouts are not run on command sending queues but are run on command receiving queues. This initial work seems to work for simple cases both with and without timeouts. However, the following tasks (at least) are still required: 1) Create unit tests 2) Limit the number of pending entries so the timeout is not dependent on the number of outstanding requests 3) Add references for cross-linked pointers between queues and remove linkages (and unref) when ??? (queue disconnected?) 4) Handle timeout before reply queue set up by generating an error message on the command queue 5) Decide whether to handle timeouts that occur while a plugin thread is sleeping. 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? Modified: branches/timeout/ (props changed) branches/timeout/CMakeLists.txt branches/timeout/ChangeLog branches/timeout/opensync/client/opensync_client.c branches/timeout/opensync/client/opensync_client_proxy.c branches/timeout/opensync/ipc/opensync_message.c branches/timeout/opensync/ipc/opensync_message_internals.h branches/timeout/opensync/ipc/opensync_message_private.h branches/timeout/opensync/ipc/opensync_queue.c branches/timeout/opensync/ipc/opensync_queue_internals.h branches/timeout/opensync/ipc/opensync_queue_private.h Modified: branches/timeout/CMakeLists.txt ============================================================================== --- branches/timeout/CMakeLists.txt Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/CMakeLists.txt Thu Jan 22 21:51:07 2009 (r5216) @@ -69,6 +69,7 @@ FIND_PACKAGE( SWIG ) FIND_PACKAGE( PythonLibs ) FIND_PACKAGE( Check ) +SET( CHECK_LIBRARIES "-lcheck_pic" ) # test configuration Modified: branches/timeout/ChangeLog ============================================================================== --- branches/timeout/ChangeLog Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/ChangeLog Thu Jan 22 21:51:07 2009 (r5216) @@ -1,3 +1,45 @@ +2009-01-22 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_queue.c (_timeout_dispatch): Restart timeout search at start of + pending list as list may have been modified while unlocked. + (_timeout_dispatch): Set message id in error message + +2009-01-21 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_queue.c (osync_queue_send_message_with_timeout): If this is + a reply, remove pending command + + * opensync/client/opensync_client_proxy.c (_osync_client_proxy_hup_handler): treat + a queue error as a disconnect + (osync_client_proxy_spawn): cross-link command and reply queues + + * opensync/client/opensync_client.c (_osync_client_handle_initialize): cross-link + command and reply queues + (_osync_client_hup_handler): treat a queue error as a disconnect + +2009-01-20 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_queue_internals.h: Add osync_queue_cross_link + + * opensync/ipc/opensync_queue.c (osync_queue_cross_link): Cross-link command and reply queues + + * opensync/ipc/opensync_queue.c (_osync_send_timeout_response): + Callback function for timeouts: send the timeout error reponse + (_incoming_dispatch): For incoming messages which have timeouts, + create a pending list entry + + * opensync/ipc/opensync_queue_private.h: Add reply_queue for pointer to + queue used for replies to incoming messages, if any. + Add cmd_queue for pointer to queue used to store pending commands. + +2009-01-19 Graham Cobb <g+...@co...> + + * opensync/ipc/opensync_message.c: Add opensync_message_set/get_timeout + + * opensync/ipc/opensync_message_internals.h: Add opensync_message_set/get_timeout + + * opensync/ipc/opensync_message_private.h: Add timeout to message + 2008-05-04 Graham Cobb <g+...@co...> * opensync/helper/opensync_hashtable.c (osync_hashtable_get_deleted): Modified: branches/timeout/opensync/client/opensync_client.c ============================================================================== --- branches/timeout/opensync/client/opensync_client.c Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/client/opensync_client.c Thu Jan 22 21:51:07 2009 (r5216) @@ -604,6 +604,7 @@ } osync_client_set_outgoing_queue(client, outgoing); + osync_queue_cross_link(client->incoming, client->outgoing); osync_queue_unref(outgoing); osync_trace(TRACE_INTERNAL, "done connecting to engine"); } @@ -1519,9 +1520,8 @@ osync_trace(TRACE_INTERNAL, "plugin received command %i on sending queue", osync_message_get_command(message)); - if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { - /* Houston, we have a problem */ - } else if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) { + if ( (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) + || (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) ) { /* The remote side disconnected. So we can now disconnect as well and then * shutdown */ if (!osync_queue_disconnect(client->outgoing, &error)) Modified: branches/timeout/opensync/client/opensync_client_proxy.c ============================================================================== --- branches/timeout/opensync/client/opensync_client_proxy.c Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/client/opensync_client_proxy.c Thu Jan 22 21:51:07 2009 (r5216) @@ -227,9 +227,8 @@ osync_trace(TRACE_INTERNAL, "client received command %i on sending queue", osync_message_get_command(message)); - if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { - /* Houston, we have a problem */ - } else if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) { + if ( (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) /* Treat an error as a disconnect */ + || (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) ) { /* The remote side disconnected. So we can now disconnect as well and then * shutdown */ if (!osync_queue_disconnect(proxy->outgoing, &error)) @@ -916,6 +915,7 @@ osync_client_set_incoming_queue(proxy->client, read1); osync_client_set_outgoing_queue(proxy->client, write2); + osync_queue_cross_link(read1, write2); if (!osync_client_run(proxy->client, error)) goto error_free_pipe2; Modified: branches/timeout/opensync/ipc/opensync_message.c ============================================================================== --- branches/timeout/opensync/ipc/opensync_message.c Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_message.c Thu Jan 22 21:51:07 2009 (r5216) @@ -74,6 +74,18 @@ return message->cmd; } +void osync_message_set_timeout(OSyncMessage *message, unsigned int timeout) +{ + osync_assert(message); + message->timeout = timeout; +} + +unsigned int osync_message_get_timeout(OSyncMessage *message) +{ + osync_assert(message); + return message->timeout; +} + void osync_message_set_id(OSyncMessage *message, long long int id) { osync_assert(message); Modified: branches/timeout/opensync/ipc/opensync_message_internals.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_message_internals.h Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_message_internals.h Thu Jan 22 21:51:07 2009 (r5216) @@ -193,6 +193,21 @@ */ OSYNC_TEST_EXPORT void osync_message_get_buffer(OSyncMessage *message, char **data, unsigned int *size); +/** @brief Set timeout (in seconds) for the message object + * + * @param message The message to modify + * @param timeout The new message timeout + * + */ +OSYNC_TEST_EXPORT void osync_message_set_timeout(OSyncMessage *message, unsigned int timeout); + +/** @brief Get timeout (in seconds) for the message object + * + * @param message The message + * + */ +OSYNC_TEST_EXPORT unsigned int osync_message_get_timeout(OSyncMessage *message); + /** @brief Sets the handler that will receive the reply * * @param message The message to work on Modified: branches/timeout/opensync/ipc/opensync_message_private.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_message_private.h Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_message_private.h Thu Jan 22 21:51:07 2009 (r5216) @@ -45,7 +45,7 @@ /** The user data */ gpointer user_data; /** The timeout associated with this message */ - //timeout_info *to_info; + unsigned int timeout; /** If this message has already been answered */ osync_bool is_answered; /** The internal OSyncMarshal object **/ Modified: branches/timeout/opensync/ipc/opensync_queue.c ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue.c Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_queue.c Thu Jan 22 21:51:07 2009 (r5216) @@ -2,6 +2,7 @@ * libosengine - A synchronization engine for the opensync framework * Copyright (C) 2004-2005 Armin Bauer <arm...@op...> * Copyright (C) 2007 Daniel Friedrich <dan...@op...> + * Copyright (C) 2009 Graham R. Cobb <g+o...@co...> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -110,11 +111,15 @@ OSyncError *timeouterr = NULL; OSyncMessage *errormsg = NULL; + osync_trace(TRACE_INTERNAL, "Timeout for message with id %lli", pending->id); + + /* Call the callback of the pending message */ osync_assert(pending->callback); osync_error_set(&timeouterr, OSYNC_ERROR_IO_ERROR, "Timeout."); errormsg = osync_message_new_errorreply(NULL, timeouterr, &error); osync_error_unref(&timeouterr); + osync_message_set_id(errormsg, pending->id); /* Remove first the pending message! To avoid that _incoming_dispatch catchs this message @@ -137,7 +142,8 @@ /* Lock again, to keep the iteration of the pendingReplies list atomic. */ g_mutex_lock(queue->pendingLock); - break; + /* Restart search as list may have been modified while it was unlocked */ + p = queue->pendingReplies; } } @@ -161,67 +167,146 @@ return FALSE; } +static void _osync_send_timeout_response(OSyncMessage *errormsg, void *user_data) +{ + OSyncQueue *queue = user_data; + + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, errormsg, queue); + + if (queue->reply_queue) { + osync_queue_send_message_with_timeout(queue->reply_queue, NULL, errormsg, 0, NULL); + } else { + osync_message_unref(errormsg); + /* TODO: as we can't send the timeout response, disconnect the queue to signal to + the sender that they aren't going to get a response. Note: we can't just + call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + osync_trace(TRACE_EXIT_ERROR, "%s: cannot find reply queue to send timeout error", __func__); + return; + } + osync_trace(TRACE_EXIT, "%s", __func__); +} + +/* Find and remove a pending message that this message is a reply to */ +static void _osync_queue_remove_pending_reply(OSyncQueue *queue, OSyncMessage *reply, gboolean callback) +{ + OSyncPendingMessage *pending = NULL; + OSyncList *p = NULL; + + osync_trace(TRACE_ENTRY, "%s(%p, %p, %d)", __func__, queue, reply, callback); + osync_trace(TRACE_INTERNAL, "Searching for pending message id=%lli", osync_message_get_id(reply)); + + /* Search for the pending reply. We have to lock the + * list since another thread might be duing the updates */ + g_mutex_lock(queue->pendingLock); + + for (p = queue->pendingReplies; p; p = p->next) { + pending = p->data; + + if (pending->id == osync_message_get_id(reply)) { + + osync_trace(TRACE_INTERNAL, "Found pending message id=%lli: %p", osync_message_get_id(reply), pending); + /* Remove first the pending message! + To avoid that _timeout_dispatch catchs this message + when we're releasing the lock. If _timeout_dispatch + would catch this message, the pending callback + gets called twice! */ + + queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + + /* Unlock the pending lock since the messages might be sent during the callback */ + g_mutex_unlock(queue->pendingLock); + + if (callback) { + /* Call the callback of the pending message */ + osync_assert(pending->callback); + pending->callback(reply, pending->user_data); + } + + // TODO: Refcounting for OSyncPendingMessage + if (pending->timeout_info) + g_free(pending->timeout_info); + osync_free(pending); + + osync_trace(TRACE_EXIT, "%s", __func__); + return; + } + } + + g_mutex_unlock(queue->pendingLock); + osync_trace(TRACE_EXIT, "%s", __func__); +} + /* This function is called from the master thread. The function dispatched incoming data from * the remote end */ static gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { - OSyncPendingMessage *pending = NULL; OSyncQueue *queue = user_data; - OSyncList *p = NULL; OSyncMessage *message = NULL; + OSyncError *error = NULL; osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data); while ((message = g_async_queue_try_pop(queue->incoming))) { /* We check if the message is a reply to something */ - osync_trace(TRACE_INTERNAL, "Dispatching %p:%i(%s)", message, osync_message_get_cmd(message), osync_message_get_commandstr(message)); + osync_trace(TRACE_INTERNAL, "Dispatching %p:%i(%s), timeout=%d, id=%lli", message, + osync_message_get_cmd(message), osync_message_get_commandstr(message), + osync_message_get_timeout(message), osync_message_get_id(message)); if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { - - /* Search for the pending reply. We have to lock the - * list since another thread might be duing the updates */ - g_mutex_lock(queue->pendingLock); - - for (p = queue->pendingReplies; p; p = p->next) { - pending = p->data; - - if (pending->id == osync_message_get_id(message)) { + /* Remove pending reply and call callback */ + _osync_queue_remove_pending_reply(queue, message, TRUE); + } else { + unsigned int timeout = osync_message_get_timeout(message); + if (timeout) { + /* This message has a timeout. Put message on pending list and run timeout */ + /* NOTE: We really want to put it on the pending list for the corresponding + outgoing queue but there may not be one at this time (if this is an Initialise, + for example). So we put it on the pending list of the incoming queue and deal with + finding the outgoing queue if the timeout fires. This also complicates removing + pending items when sending the responses. Oh well. */ + + OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), &error); + if (!pending) + goto error; - /* Remove first the pending message! - To avoid that _timeout_dispatch catchs this message - when we're releasing the lock. If _timeout_dispatch - would catch this message, the pending callback - gets called twice! */ + pending->id = osync_message_get_id(message); - queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + OSyncTimeoutInfo *toinfo = osync_try_malloc0(sizeof(OSyncTimeoutInfo), &error); + if (!toinfo) + goto error; - /* Unlock the pending lock since the messages might be sent during the callback */ - g_mutex_unlock(queue->pendingLock); - - /* Call the callback of the pending message */ - osync_assert(pending->callback); - pending->callback(message, pending->user_data); - - // TODO: Refcounting for OSyncPendingMessage - if (pending->timeout_info) - g_free(pending->timeout_info); - osync_free(pending); - - /* Lock again, to keep the iteration of the pendingReplies list atomic. */ - g_mutex_lock(queue->pendingLock); - break; - } + GTimeVal current_time; + g_source_get_current_time(queue->timeout_source, ¤t_time); + + toinfo->expiration = current_time; + toinfo->expiration.tv_sec += timeout; + + pending->timeout_info = toinfo; + + pending->callback = _osync_send_timeout_response; + pending->user_data = queue; + + g_mutex_lock(queue->pendingLock); + queue->pendingReplies = osync_list_append(queue->pendingReplies, pending); + g_mutex_unlock(queue->pendingLock); } - - g_mutex_unlock(queue->pendingLock); - } else + queue->message_handler(message, queue->user_data); + } osync_message_unref(message); } osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__); return TRUE; + + error: + /* TODO: as we can't send the timeout response, disconnect the queue to signal to + the sender that they aren't going to get a response. Note: we can't just + call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + + osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error)); + return TRUE; } static void _osync_queue_stop_incoming(OSyncQueue *queue) @@ -328,6 +413,10 @@ if (!_osync_queue_write_long_long_int(queue, osync_message_get_id(message), &error)) goto error; + /* The timeout (integer) */ + if (!_osync_queue_write_int(queue, (int) osync_message_get_timeout(message), &error)) + goto error; + osync_message_get_buffer(message, &data, &length); if (length) { @@ -498,7 +587,7 @@ do { int size = 0; - int cmd = 0; + int cmd = 0, timeout = 0; long long int id = 0; char *buffer = NULL; @@ -514,11 +603,16 @@ if (!_osync_queue_read_long_long_int(queue, &id, &error)) goto error; + /* The timeout */ + if (!_osync_queue_read_int(queue, &timeout, &error)) + goto error; + message = osync_message_new(cmd, size, &error); if (!message) goto error; osync_message_set_id(message, id); + osync_message_set_timeout(message, (unsigned int) timeout); /* We now get the buffer from the message which will already * have the correct size for the read */ @@ -933,7 +1027,7 @@ void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data) { - osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data); + osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, handler, user_data); queue->message_handler = handler; queue->user_data = user_data; @@ -941,6 +1035,22 @@ osync_trace(TRACE_EXIT, "%s", __func__); } +void osync_queue_cross_link(OSyncQueue *cmd_queue, OSyncQueue *reply_queue) +{ + /* Cross-linking is needed to make timeouts work when commands are + being received on one queue and replies sent on another. + Note that cross-linking is not needed when commands are being sent. */ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, cmd_queue, reply_queue); + + osync_assert(cmd_queue->type == OSYNC_QUEUE_RECEIVER); + osync_assert(reply_queue->type == OSYNC_QUEUE_SENDER); + + cmd_queue->reply_queue = reply_queue; + reply_queue->cmd_queue = cmd_queue; + + osync_trace(TRACE_EXIT, "%s", __func__); +} + void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context) { OSyncQueue **queueptr = NULL; @@ -1035,6 +1145,19 @@ { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %u, %p)", __func__, queue, replyqueue, message, timeout, error); + if (queue->cmd_queue) { + /* This queue is a reply queue for some command receiving queue */ + + /* If this is actually a reply message, we check to see if there is + message to remove from the command queue's pending list */ + + if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { + /* Remove pending reply but do not call callback + (real callback will be called by the receiver of this reply later) */ + _osync_queue_remove_pending_reply(queue->cmd_queue, message, FALSE); + } + } + if (osync_message_get_handler(message)) { OSyncPendingMessage *pending = NULL; GTimeVal current_time; @@ -1053,14 +1176,8 @@ osync_trace(TRACE_INTERNAL, "Setting id %lli for pending reply", id); if (timeout) { - OSyncTimeoutInfo *toinfo = osync_try_malloc0(sizeof(OSyncTimeoutInfo), error); - if (!toinfo) - goto error; - - toinfo->expiration = current_time; - toinfo->expiration.tv_sec += timeout; - - pending->timeout_info = toinfo; + /* Send timeout info to other end to handle */ + osync_message_set_timeout(message, timeout); } else { osync_trace(TRACE_INTERNAL, "handler message got sent without timeout!: %s", osync_message_get_commandstr(message)); } @@ -1118,6 +1235,7 @@ { osync_assert(queue); return queue->name; + } int osync_queue_get_fd(OSyncQueue *queue) Modified: branches/timeout/opensync/ipc/opensync_queue_internals.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_internals.h Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_queue_internals.h Thu Jan 22 21:51:07 2009 (r5216) @@ -85,6 +85,20 @@ OSYNC_TEST_EXPORT void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data); /** + * @brief Cross links command queue and reply queue + * + * Stores the queue used for replies in the command queue object so + * that timeout responses can be sent if necessary. + * And stores the command queue in the reply queue object so that + * replies can remove pending messages before they time out. + * + * @param cmd_queue The command queue used to receive incoming commands + * @param reply_queue The queue used to send replies + * + */ +OSYNC_TEST_EXPORT void osync_queue_cross_link(OSyncQueue *cmd_queue, OSyncQueue *reply_queue); + +/** * @brief Sends a Message to a Queue * @param queue Pointer to the queue * @param replyqueue Modified: branches/timeout/opensync/ipc/opensync_queue_private.h ============================================================================== --- branches/timeout/opensync/ipc/opensync_queue_private.h Thu Jan 22 19:12:55 2009 (r5215) +++ branches/timeout/opensync/ipc/opensync_queue_private.h Thu Jan 22 21:51:07 2009 (r5216) @@ -89,6 +89,12 @@ /** Reference count **/ int ref_count; + + /* Queue for replies to incoming messages */ + OSyncQueue *reply_queue; + + /* Queue for receiving commands we are replying to */ + OSyncQueue *cmd_queue; }; |