From: <ga...@us...> - 2011-03-31 19:28:08
|
Revision: 5149 http://jaffa.svn.sourceforge.net/jaffa/?rev=5149&view=rev Author: gautamj Date: 2011-03-31 19:28:02 +0000 (Thu, 31 Mar 2011) Log Message: ----------- Implemented the remaining todos Modified Paths: -------------- trunk/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsQueueAdmin.java Modified: trunk/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsQueueAdmin.java =================================================================== --- trunk/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsQueueAdmin.java 2011-03-31 18:47:58 UTC (rev 5148) +++ trunk/JaffaSOA/source/java/org/jaffa/modules/messaging/services/JmsQueueAdmin.java 2011-03-31 19:28:02 UTC (rev 5149) @@ -253,75 +253,184 @@ return output; } + /** + * NOTE: Each graph is expected to contain type. It may also contain the optional queueMetaData.queueSystemId, which may help optimize the process. + */ public QueueAdminResponse[] toggleQueueStatus(QueueGraph[] graphs) { if (log.isDebugEnabled()) log.debug("Input to toggleQueueStatus: " + Arrays.toString(graphs)); - Collection<QueueAdminResponse> output = new LinkedList<QueueAdminResponse>(); - if (graphs != null) { - String[] queueNames = null; - for (QueueGraph graph : graphs) { - if (graph.getQueueMetaData() == null || graph.getQueueMetaData().getQueueSystemId() == null || graph.getQueueMetaData().getQueueSystemId().equals(QUEUE_SYSTEM_ID)) { - if (queueNames == null) { - queueNames = JmsBrowser.getAccessibleQueueNames(); - Arrays.sort(queueNames); - } - String queueName = graph.getType(); - if (Arrays.binarySearch(queueNames, queueName) >= 0 && JmsBrowser.hasAdminMessageAccess(queueName)) { - try { - Long consumerCount = new Long(JmsBrowser.getActiveConsumersCount(queueName)); - if (ConfigurationService.getInstance().getQueueInfo(queueName).getConsumerPolicy() == ConsumerPolicy.NONE) { - if (log.isDebugEnabled()) - log.debug("ConsumerPolicy of queue " + queueName + " is NONE. It's status cannnot be toggled"); - } else if (consumerCount > 0) { - if (log.isDebugEnabled()) - log.debug("Status of queue " + queueName + " will be set to INACTIVE"); - JmsBrowser.stopMessageDelivery(queueName); - } else { - if (log.isDebugEnabled()) - log.debug("Status of queue " + queueName + " will be set to ACTIVE"); - JmsBrowser.startMessageDelivery(queueName); - } - } catch (Exception e) { - log.error("Exception thrown while toggling status of queue " + queueName, e); - QueueAdminResponse response = new QueueAdminResponse(); - response.setSource(graph); + Collection<QueueAdminResponse> output = null; + String[] queueNames = null; + for (QueueGraph graph : graphs) { + if (graph.getQueueMetaData() == null || graph.getQueueMetaData().getQueueSystemId() == null || graph.getQueueMetaData().getQueueSystemId().equals(QUEUE_SYSTEM_ID)) { + // Initialize and sort the queueNames array; if not already done + if (queueNames == null) { + queueNames = ConfigurationService.getInstance().getQueueNames(); + Arrays.sort(queueNames); + } + if (Arrays.binarySearch(queueNames, graph.getType()) >= 0) { + try { + if (!JmsBrowser.hasAdminMessageAccess(graph.getType())) + throw new JaffaMessagingApplicationException(JaffaMessagingApplicationException.NO_ADMIN_MESSAGE_ACESSS, new Object[]{graph.getType()}); + Long consumerCount = new Long(JmsBrowser.getActiveConsumersCount(graph.getType())); + if (ConfigurationService.getInstance().getQueueInfo(graph.getType()).getConsumerPolicy() == ConsumerPolicy.NONE) { + if (log.isDebugEnabled()) + log.debug("ConsumerPolicy of queue " + graph.getType() + " is NONE. It's status cannnot be toggled"); + } else if (consumerCount > 0) { + if (log.isDebugEnabled()) + log.debug("Status of queue " + graph.getType() + " will be set to INACTIVE"); + JmsBrowser.stopMessageDelivery(graph.getType()); + } else { + if (log.isDebugEnabled()) + log.debug("Status of queue " + graph.getType() + " will be set to ACTIVE"); + JmsBrowser.startMessageDelivery(graph.getType()); + } + } catch (Exception e) { + QueueAdminResponse response = new QueueAdminResponse(); + response.setSource(graph); + ApplicationExceptions appExps = ExceptionHelper.extractApplicationExceptions(e); + if (appExps != null) { + if (log.isDebugEnabled()) + log.debug("Error while toggling status of queue " + graph.getType(), appExps); + response.setErrors(ServiceError.generate(appExps)); + } else { + log.error("Internal Error while toggling status of queue " + graph.getType(), e); response.setErrors(ServiceError.generate(e)); - output.add(response); } - } else { - if (log.isDebugEnabled()) - log.debug("Status of queue " + queueName + " will not be toggled by this implementation, since the queue does not belong to this implementation or the user may not have access to it"); + if (output == null) + output = new LinkedList<QueueAdminResponse>(); + output.add(response); } + } else { + if (log.isDebugEnabled()) + log.debug("Status of queue " + graph.getType() + " will not be toggled by this implementation, since the queue does not belong to this system"); } + } else { + if (log.isDebugEnabled()) + log.debug("Status of queue " + graph.getType() + " will not be toggled by this implementation, since the input queueSystemId does not match this system"); } } if (log.isDebugEnabled()) log.debug("Output from toggleQueueStatus: " + output); - return output.size() > 0 ? output.toArray(new QueueAdminResponse[output.size()]) : null; + return output != null && output.size() > 0 ? output.toArray(new QueueAdminResponse[output.size()]) : null; } + /** + * NOTE: Each graph is expected to contain type, messageId and status. It may also contain the optional queueMetaData.queueSystemId, which may help optimize the process. + */ public MessageAdminResponse[] deleteMessage(MessageGraph[] graphs) { if (log.isDebugEnabled()) log.debug("Input to deleteMessage: " + Arrays.toString(graphs)); - Collection<MessageAdminResponse> output = new LinkedList<MessageAdminResponse>(); - - //@todo - + Collection<MessageAdminResponse> output = null; + String[] queueNames = null; + for (MessageGraph graph : graphs) { + if (graph.getQueueMetaData() == null || graph.getQueueMetaData().getQueueSystemId() == null || graph.getQueueMetaData().getQueueSystemId().equals(QUEUE_SYSTEM_ID)) { + // Initialize and sort the queueNames array; if not already done + if (queueNames == null) { + queueNames = ConfigurationService.getInstance().getQueueNames(); + Arrays.sort(queueNames); + } + if (Arrays.binarySearch(queueNames, graph.getType()) >= 0) { + try { + if (graph.getStatus() == MessageGraph.Status.OPEN) { + if (log.isDebugEnabled()) + log.debug("Deleting OPEN message " + graph.getMessageId()); + JmsBrowser.deleteMessage(graph.getType(), graph.getMessageId()); + } else if (graph.getStatus() == MessageGraph.Status.ERROR) { + if (log.isDebugEnabled()) + log.debug("Deleting ERROR message " + graph.getMessageId()); + JmsBrowser.deleteMessage(JmsBrowser.getErrorQueueName(), graph.getMessageId()); + } else if (graph.getStatus() == MessageGraph.Status.IN_PROCESS) { + if (log.isDebugEnabled()) + log.debug("Deleting IN_PROCESS message " + graph.getMessageId()); + JmsBrowser.killInProcessMessage(graph.getType(), graph.getMessageId()); + } else { + if (log.isDebugEnabled()) + log.debug("Message cannnot be deleted since unsupported status " + graph.getStatus() + " has been passed"); + } + } catch (Exception e) { + MessageAdminResponse response = new MessageAdminResponse(); + response.setSource(graph); + ApplicationExceptions appExps = ExceptionHelper.extractApplicationExceptions(e); + if (appExps != null) { + if (log.isDebugEnabled()) + log.debug("Error while deleting Message " + graph, appExps); + response.setErrors(ServiceError.generate(appExps)); + } else { + log.error("Internal Error while deleting Message " + graph, e); + response.setErrors(ServiceError.generate(e)); + } + if (output == null) + output = new LinkedList<MessageAdminResponse>(); + output.add(response); + } + } else { + if (log.isDebugEnabled()) + log.debug("Message " + graph + " will not be deleted by this implementation, since the queue does not belong to this implementation"); + } + } else { + if (log.isDebugEnabled()) + log.debug("Message " + graph + " will not be deleted by this implementation, since the input queueSystemId does not match this system"); + } + } if (log.isDebugEnabled()) log.debug("Output from deleteMessage: " + output); - return output.size() > 0 ? output.toArray(new MessageAdminResponse[output.size()]) : null; + return output != null && output.size() > 0 ? output.toArray(new MessageAdminResponse[output.size()]) : null; } + /** + * NOTE: Each graph is expected to contain type, messageId and status. It may also contain the optional queueMetaData.queueSystemId, which may help optimize the process. + */ public MessageAdminResponse[] resubmitMessage(MessageGraph[] graphs) { if (log.isDebugEnabled()) log.debug("Input to resubmitMessage: " + Arrays.toString(graphs)); - Collection<MessageAdminResponse> output = new LinkedList<MessageAdminResponse>(); - - //@todo - + Collection<MessageAdminResponse> output = null; + String[] queueNames = null; + for (MessageGraph graph : graphs) { + if (graph.getQueueMetaData() == null || graph.getQueueMetaData().getQueueSystemId() == null || graph.getQueueMetaData().getQueueSystemId().equals(QUEUE_SYSTEM_ID)) { + // Initialize and sort the queueNames array; if not already done + if (queueNames == null) { + queueNames = ConfigurationService.getInstance().getQueueNames(); + Arrays.sort(queueNames); + } + if (Arrays.binarySearch(queueNames, graph.getType()) >= 0) { + try { + if (graph.getStatus() == MessageGraph.Status.ERROR) { + if (log.isDebugEnabled()) + log.debug("Resubmitting ERROR message " + graph.getMessageId()); + JmsBrowser.resubmitMessage(JmsBrowser.getErrorQueueName(), graph.getMessageId()); + } else { + if (log.isDebugEnabled()) + log.debug("Message cannnot be resubmitted since unsupported status " + graph.getStatus() + " has been passed"); + } + } catch (Exception e) { + MessageAdminResponse response = new MessageAdminResponse(); + response.setSource(graph); + ApplicationExceptions appExps = ExceptionHelper.extractApplicationExceptions(e); + if (appExps != null) { + if (log.isDebugEnabled()) + log.debug("Error while resubmitting Message " + graph, appExps); + response.setErrors(ServiceError.generate(appExps)); + } else { + log.error("Internal Error while resubmitting Message " + graph, e); + response.setErrors(ServiceError.generate(e)); + } + if (output == null) + output = new LinkedList<MessageAdminResponse>(); + output.add(response); + } + } else { + if (log.isDebugEnabled()) + log.debug("Message " + graph + " will not be resubmitted by this implementation, since the queue does not belong to this implementation"); + } + } else { + if (log.isDebugEnabled()) + log.debug("Message " + graph + " will not be resubmitted by this implementation, since the input queueSystemId does not match this system"); + } + } if (log.isDebugEnabled()) log.debug("Output from resubmitMessage: " + output); - return output.size() > 0 ? output.toArray(new MessageAdminResponse[output.size()]) : null; + return output != null && output.size() > 0 ? output.toArray(new MessageAdminResponse[output.size()]) : null; } /** Creates MetaData for the input Queue, based on the available fields in the PropertyFilter. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |