|
From: Gait B. <gai...@us...> - 2004-07-12 09:25:06
|
Update of /cvsroot/ebxmlms/ebxmlms/src/hk/hku/cecid/phoenix/message/handler In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10552 Modified Files: MessageServiceHandler.java MessageServer.java Log Message: Allow correct processing of business messages with signals. Errors and Acknowledgements can also be sent along with a normal business response. Until now, those messages would have been lost as processing ended after handling the signal. Coding has changed such that for combined signals and data, both are processed correctly. Index: MessageServer.java =================================================================== RCS file: /cvsroot/ebxmlms/ebxmlms/src/hk/hku/cecid/phoenix/message/handler/MessageServer.java,v retrieving revision 1.151 retrieving revision 1.152 diff -C2 -d -r1.151 -r1.152 *** MessageServer.java 8 Apr 2004 04:37:44 -0000 1.151 --- MessageServer.java 12 Jul 2004 09:24:43 -0000 1.152 *************** *** 1097,1102 **** exception = null; try { ! store(ebxmlMessage, appContext, STATE_RECEIVED, ! FIRST_MESSAGE_DELIVERED, tx); update = DbTableManager.DBTABLE_MESSAGE_STORE.getUpdateQuery (tx.getConnection(), --- 1097,1106 ---- exception = null; try { ! /* Only store the message if it's really just an ack */ ! if( ebxmlMessage.getService().equals(Constants.SERVICE) && ebxmlMessage.getAction().equals(Constants.ACTION_ACKNOWLEDGMENT)) { ! store(ebxmlMessage, appContext, STATE_RECEIVED, ! FIRST_MESSAGE_DELIVERED, tx); ! } ! update = DbTableManager.DBTABLE_MESSAGE_STORE.getUpdateQuery (tx.getConnection(), Index: MessageServiceHandler.java =================================================================== RCS file: /cvsroot/ebxmlms/ebxmlms/src/hk/hku/cecid/phoenix/message/handler/MessageServiceHandler.java,v retrieving revision 1.188 retrieving revision 1.189 diff -C2 -d -r1.188 -r1.189 *** MessageServiceHandler.java 21 Apr 2004 12:41:07 -0000 1.188 --- MessageServiceHandler.java 12 Jul 2004 09:24:40 -0000 1.189 *************** *** 4675,4684 **** final boolean isPong = service.equals(Constants.SERVICE) && action.equals(Constants.ACTION_PONG); ! final boolean isError = (ebxmlMessage.getErrorList() != null); ApplicationContext appContext; MessageServiceHandlerConnection mshConnection = null; if (statusRequest == null && statusResponse == null && ! acknowledgment == null && !isPing && !isPong && !isError) { appContext = getApplicationContext(ebxmlMessage.getCpaId(), ebxmlMessage.getConversationId(), service, action); --- 4675,4689 ---- final boolean isPong = service.equals(Constants.SERVICE) && action.equals(Constants.ACTION_PONG); ! final boolean isError = service.equals(Constants.SERVICE) && ! action.equals(Constants.ACTION_MESSAGE_ERROR); ! final boolean isAcknowledgment = service.equals(Constants.SERVICE) && ! action.equals(Constants.ACTION_ACKNOWLEDGMENT); ! ! final boolean reportsErrors = (ebxmlMessage.getErrorList() != null); ApplicationContext appContext; MessageServiceHandlerConnection mshConnection = null; if (statusRequest == null && statusResponse == null && ! !isAcknowledgment && !isPing && !isPong && !isError) { appContext = getApplicationContext(ebxmlMessage.getCpaId(), ebxmlMessage.getConversationId(), service, action); *************** *** 4718,4722 **** String status = "Received"; // Check if it is an incoming error message ! if (isError) { logger.debug("Error message received"); final String refToMessageId = ebxmlMessage.getMessageHeader(). --- 4723,4727 ---- String status = "Received"; // Check if it is an incoming error message ! if (reportsErrors) { logger.debug("Error message received"); final String refToMessageId = ebxmlMessage.getMessageHeader(). *************** *** 4756,4760 **** } } ! else { // Message is validated only if it is not an error message in // order to avoid infinite loop --- 4761,4766 ---- } } ! ! if(!isError) { // Message is validated only if it is not an error message in // order to avoid infinite loop *************** *** 4834,4838 **** // the registration or not if (statusRequest == null && statusResponse == null && ! acknowledgment == null && !isPing && !isPong && !isError) { if (mshConnection != null) { int ackRequestedInReg = mshConnection --- 4840,4844 ---- // the registration or not if (statusRequest == null && statusResponse == null && ! !isAcknowledgment && !isPing && !isPong && !isError) { if (mshConnection != null) { int ackRequestedInReg = mshConnection *************** *** 4969,5177 **** } } ! else if (ackRequested != null) { ! logger.debug("message has an AckReq"); ! tx.lock(messageId); ! if (messageServer.hasReceived(ebxmlMessage, appContext, tx)) { ! logger.debug("message has been received previously"); ! status = "Received before - "; ! if (!ebxmlMessage.getDuplicateElimination()) { ! messageServer.setDeliveryStatus ! (ebxmlMessage.getMessageId(), false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! status = status + "Deliver to Application - "; ! } ! if (suppressedAck) { ! logger.debug("Ack sending suppressed"); ! status = status + "Acknowledgment suppressed"; ! } ! else { ! EbxmlMessage refToMessage = messageServer. ! getRefToMessage(messageId, tx); ! boolean shouldSend = true; ! int syncReplyMode = mshConnection. ! getMessageServiceHandlerConfig().getSyncReply(); ! if (refToMessage == null) { ! logger.debug("old Ack not found"); ! refToMessage = generateAcknowledgment(ebxmlMessage, ! messageId); ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! messageServer.store(refToMessage, appContext, ! MessageServer.STATE_SENT_STARTED, true, tx); ! } ! else { ! messageServer.store(refToMessage, appContext, ! MessageServer.STATE_SENT, true, tx); ! } ! logger.debug("old Acknowledgment missing"); ! status = status + "Old Acknowledgment missing"; ! } ! else { ! String oldAckMsgId = refToMessage.getMessageId(); ! if (sendThreadMap.get(oldAckMsgId) == null) { ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! messageServer.resend(oldAckMsgId, tx); ! logger.debug("old Acknowledgment resent"); ! status += "Old Acknowledgment resent"; ! } ! else { ! logger.debug("Old Acknowledgment replied " ! + "synchronously"); ! status += "Old Acknowledgment replied " ! + "synchronously"; ! } ! } ! else { ! logger.debug("Old Acknowledgment is being " ! + "resent; skipping."); ! status = status + "Old Acknowledgement is " ! + "being resent; skipping."; ! shouldSend = false; ! } ! } ! if (shouldSend) { ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! final MessageProcessor messageProcessor = new ! MessageProcessor(refToMessage, ! mshConnection. ! getMessageServiceHandlerConfig(), this); ! addSendThread(refToMessage.getMessageId(), ! messageProcessor); ! messageProcessor.start(); ! } ! else { ! response = refToMessage; ! } ! } ! } ! } ! else { ! logger.debug("message has not been received previously"); ! status = "Received firstly - "; ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! final EbxmlMessage ackMessage = ! generateAcknowledgment(ebxmlMessage, messageId); ! if (suppressedAck) { ! logger.debug("Ack sending suppressed"); ! status = status + "Acknowledgment suppressed"; ! } ! else if (mshConnection.getMessageServiceHandlerConfig(). ! getSyncReply() == Constants.SYNC_REPLY_MODE_NONE) { ! mshConnection.send(ackMessage, tx); ! logger.debug("Ack sent"); ! status = status + "Acknowledgment sent"; ! } ! else { ! messageServer.store(ackMessage, appContext, ! MessageServer.STATE_SENT, true, tx); ! logger.debug("Ack replied synchronously"); ! status += "Acknowledgment replied synchronously"; ! response = ackMessage; ! } ! } ! } ! else if (statusRequest != null) { ! logger.debug("Status Request message is received"); ! final String refToMessageId = statusRequest.getRefToMessageId(); ! final String [] result = messageServer. ! getMessageStatus(refToMessageId, tx); ! final String messageStatus = result[0]; ! final String timestamp = result[1]; ! if (!messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED) && ! !messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED)) { ! appContext = new ApplicationContext ! (result[2], result[3], result[4], result[5]); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, true, tx); ! } ! response = generateStatusResponseMessage(ebxmlMessage, ! messageStatus, timestamp); ! if (!messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED) && ! !messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED)) { ! messageServer.store(response, appContext, ! MessageServer.STATE_SENT, true, tx); ! } ! logger.debug("Status Response message is sent back"); ! } ! else if (statusResponse != null) { ! String statusRequestMessageId = ebxmlMessage. ! getMessageHeader().getRefToMessageId(); ! appContext = messageServer. ! getApplicationContext(statusRequestMessageId, tx); ! mshConnection = (MessageServiceHandlerConnection) ! mshConnectionTable.get(appContext); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! logger.debug("Status Response message is received"); ! } ! else if (isPing) { ! logger.debug("Ping message is received"); ! response = generatePongMessage(ebxmlMessage); ! messageServer.store(response, appContext, ! MessageServer.STATE_SENT, true, tx); ! logger.debug("Pong message is sent back"); ! } ! else if (isPong) { ! String pingMessageId = ebxmlMessage.getMessageHeader(). ! getRefToMessageId(); ! appContext = messageServer.getApplicationContext ! (pingMessageId, tx); ! mshConnection = (MessageServiceHandlerConnection) ! mshConnectionTable.get(appContext); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! logger.info("Pong message is received"); ! } ! else { ! tx.lock(messageId); ! final boolean hasReceived = messageServer. ! hasReceived(ebxmlMessage, appContext, tx); ! if (!ebxmlMessage.getDuplicateElimination() || !hasReceived) { ! logger.debug("message doesn't have AckReq"); ! if (!hasReceived) { ! logger.debug( ! "message has not been received previously"); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! status = "Received firstly - Deliver to Application"; ! } ! else { ! status = "Received before - Deliver to Application"; ! messageServer.setDeliveryStatus ! (ebxmlMessage.getMessageId(), false, tx); ! } ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! } ! else { ! logger.debug("will not process message"); ! status = "Received before - Not deliver to Application"; ! } ! } try { --- 4975,5186 ---- } } ! ! if( !isAcknowledgment ) { ! if (ackRequested != null) { ! logger.debug("message has an AckReq"); ! tx.lock(messageId); ! if (messageServer.hasReceived(ebxmlMessage, appContext, tx)) { ! logger.debug("message has been received previously"); ! status = "Received before - "; ! if (!ebxmlMessage.getDuplicateElimination()) { ! messageServer.setDeliveryStatus ! (ebxmlMessage.getMessageId(), false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! status = status + "Deliver to Application - "; ! } ! if (suppressedAck) { ! logger.debug("Ack sending suppressed"); ! status = status + "Acknowledgment suppressed"; ! } ! else { ! EbxmlMessage refToMessage = messageServer. ! getRefToMessage(messageId, tx); ! boolean shouldSend = true; ! int syncReplyMode = mshConnection. ! getMessageServiceHandlerConfig().getSyncReply(); ! if (refToMessage == null) { ! logger.debug("old Ack not found"); ! refToMessage = generateAcknowledgment(ebxmlMessage, ! messageId); ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! messageServer.store(refToMessage, appContext, ! MessageServer.STATE_SENT_STARTED, true, tx); ! } ! else { ! messageServer.store(refToMessage, appContext, ! MessageServer.STATE_SENT, true, tx); ! } ! logger.debug("old Acknowledgment missing"); ! status = status + "Old Acknowledgment missing"; ! } ! else { ! String oldAckMsgId = refToMessage.getMessageId(); ! if (sendThreadMap.get(oldAckMsgId) == null) { ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! messageServer.resend(oldAckMsgId, tx); ! logger.debug("old Acknowledgment resent"); ! status += "Old Acknowledgment resent"; ! } ! else { ! logger.debug("Old Acknowledgment replied " ! + "synchronously"); ! status += "Old Acknowledgment replied " ! + "synchronously"; ! } ! } ! else { ! logger.debug("Old Acknowledgment is being " ! + "resent; skipping."); ! status = status + "Old Acknowledgement is " ! + "being resent; skipping."; ! shouldSend = false; ! } ! } ! if (shouldSend) { ! if (syncReplyMode == Constants. ! SYNC_REPLY_MODE_NONE) { ! final MessageProcessor messageProcessor = new ! MessageProcessor(refToMessage, ! mshConnection. ! getMessageServiceHandlerConfig(), this); ! addSendThread(refToMessage.getMessageId(), ! messageProcessor); ! messageProcessor.start(); ! } ! else { ! response = refToMessage; ! } ! } ! } ! } ! else { ! logger.debug("message has not been received previously"); ! status = "Received firstly - "; ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! final EbxmlMessage ackMessage = ! generateAcknowledgment(ebxmlMessage, messageId); ! if (suppressedAck) { ! logger.debug("Ack sending suppressed"); ! status = status + "Acknowledgment suppressed"; ! } ! else if (mshConnection.getMessageServiceHandlerConfig(). ! getSyncReply() == Constants.SYNC_REPLY_MODE_NONE) { ! mshConnection.send(ackMessage, tx); ! logger.debug("Ack sent"); ! status = status + "Acknowledgment sent"; ! } ! else { ! messageServer.store(ackMessage, appContext, ! MessageServer.STATE_SENT, true, tx); ! logger.debug("Ack replied synchronously"); ! status += "Acknowledgment replied synchronously"; ! response = ackMessage; ! } ! } ! } ! else if (statusRequest != null) { ! logger.debug("Status Request message is received"); ! final String refToMessageId = statusRequest.getRefToMessageId(); ! final String [] result = messageServer. ! getMessageStatus(refToMessageId, tx); ! final String messageStatus = result[0]; ! final String timestamp = result[1]; ! if (!messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED) && ! !messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED)) { ! appContext = new ApplicationContext ! (result[2], result[3], result[4], result[5]); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, true, tx); ! } ! response = generateStatusResponseMessage(ebxmlMessage, ! messageStatus, timestamp); ! if (!messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED) && ! !messageStatus.equals(Constants.STATUS_NOT_RECOGNIZED)) { ! messageServer.store(response, appContext, ! MessageServer.STATE_SENT, true, tx); ! } ! logger.debug("Status Response message is sent back"); ! } ! else if (statusResponse != null) { ! String statusRequestMessageId = ebxmlMessage. ! getMessageHeader().getRefToMessageId(); ! appContext = messageServer. ! getApplicationContext(statusRequestMessageId, tx); ! mshConnection = (MessageServiceHandlerConnection) ! mshConnectionTable.get(appContext); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! logger.debug("Status Response message is received"); ! } ! else if (isPing) { ! logger.debug("Ping message is received"); ! response = generatePongMessage(ebxmlMessage); ! messageServer.store(response, appContext, ! MessageServer.STATE_SENT, true, tx); ! logger.debug("Pong message is sent back"); ! } ! else if (isPong) { ! String pingMessageId = ebxmlMessage.getMessageHeader(). ! getRefToMessageId(); ! appContext = messageServer.getApplicationContext ! (pingMessageId, tx); ! mshConnection = (MessageServiceHandlerConnection) ! mshConnectionTable.get(appContext); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! logger.info("Pong message is received"); ! } ! else { ! tx.lock(messageId); ! final boolean hasReceived = messageServer. ! hasReceived(ebxmlMessage, appContext, tx); ! if (!ebxmlMessage.getDuplicateElimination() || !hasReceived) { ! logger.debug("message doesn't have AckReq"); ! if (!hasReceived) { ! logger.debug( ! "message has not been received previously"); ! messageServer.store(ebxmlMessage, appContext, ! MessageServer.STATE_RECEIVED, false, tx); ! status = "Received firstly - Deliver to Application"; ! } ! else { ! status = "Received before - Deliver to Application"; ! messageServer.setDeliveryStatus ! (ebxmlMessage.getMessageId(), false, tx); ! } ! Delivery delivery = new Delivery(this, appContext, ! mshConnection.getMessageServiceHandlerConfig(). ! getMessageListener(), ebxmlMessage); ! tx.addThread(delivery); ! } ! else { ! logger.debug("will not process message"); ! status = "Received before - Not deliver to Application"; ! } ! } ! } try { |