[ohla-devel] SF.net SVN: ohla: [96] trunk/rti/src/java/net/sf/ohla/rti1516
Status: Beta
Brought to you by:
mnewcomb
From: <mne...@us...> - 2007-01-30 01:53:57
|
Revision: 96 http://svn.sourceforge.net/ohla/?rev=96&view=rev Author: mnewcomb Date: 2007-01-29 17:53:57 -0800 (Mon, 29 Jan 2007) Log Message: ----------- - almost complete refactor to centralized RTI Modified Paths: -------------- trunk/rti/src/java/net/sf/ohla/rti1516/RTI.java trunk/rti/src/java/net/sf/ohla/rti1516/federate/Federate.java trunk/rti/src/java/net/sf/ohla/rti1516/federate/FederateSave.java trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectManager.java trunk/rti/src/java/net/sf/ohla/rti1516/federate/time/TimeManager.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/FederationExecution.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/time/TimeConstrainedFederate.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/time/TimeKeeper.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/time/TimeRegulatingFederate.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/FederateSaveInitiated.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/JoinFederationExecution.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/JoinFederationExecutionResponse.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/ReserveObjectInstanceName.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AnnounceSynchronizationPoint.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributeIsNotOwned.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributeIsOwnedByRTI.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributeOwnershipAcquisitionNotification.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributeOwnershipUnavailable.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributesInScope.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/AttributesOutOfScope.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/Callback.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/CallbackManager.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ConfirmAttributeOwnershipAcquisitionCancellation.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/DiscoverObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationNotRestored.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationNotSaved.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationRestoreBegun.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationRestoreStatusResponse.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationRestored.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationSaveStatusResponse.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationSaved.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/FederationSynchronized.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/InformAttributeOwnership.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/InitiateFederateRestore.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/InitiateFederateSave.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ObjectInstanceNameReservationFailed.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ObjectInstanceNameReservationSucceeded.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ProvideAttributeValueUpdate.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ReceiveInteraction.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ReflectAttributeValues.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RemoveObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestAttributeOwnershipAssumption.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestAttributeOwnershipRelease.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestDivestitureConfirmation.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestFederationRestoreFailed.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestFederationRestoreSucceeded.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/RequestRetraction.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/StartRegistrationForObjectClass.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/StopRegistrationForObjectClass.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/SynchronizationPointRegistrationFailed.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/SynchronizationPointRegistrationSucceeded.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TimeAdvanceGrant.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TimeConstrainedEnabled.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TimeRegulationEnabled.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TurnInteractionsOff.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TurnInteractionsOn.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TurnUpdatesOffForObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/TurnUpdatesOnForObjectInstance.java Added Paths: ----------- trunk/rti/src/java/net/sf/ohla/rti1516/federation/Federate.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/FederateIoFilter.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/objects/ trunk/rti/src/java/net/sf/ohla/rti1516/federation/objects/AttributeInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/objects/ObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/federation/objects/ObjectManager.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/DeleteObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/SendInteraction.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/UpdateAttributeValues.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/callbacks/ Removed Paths: ------------- trunk/rti/src/java/net/sf/ohla/rti1516/messages/DeleteObjectInstance.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/ObjectInstanceNameReserved.java trunk/rti/src/java/net/sf/ohla/rti1516/messages/ObjectInstanceRegistered.java Modified: trunk/rti/src/java/net/sf/ohla/rti1516/RTI.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/RTI.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/RTI.java 2007-01-30 01:53:57 UTC (rev 96) @@ -39,8 +39,6 @@ public class RTI { - public static final String FEDERATION_EXECUTION = "FederationExecution"; - private static final Logger log = LoggerFactory.getLogger(RTI.class); public static final String OHLA_RTI_ACCEPTOR_PATTERN = @@ -218,7 +216,7 @@ federations.get(federationExecutionName); if (federationExecution != null) { - federationExecution.process( + federationExecution.joinFederationExecution( session, joinFederationExecution); } else @@ -251,38 +249,24 @@ public void messageReceived(IoSession session, Object message) throws Exception { - FederationExecution federationExecution = getFederationExecution(session); - if (federationExecution == null || - !federationExecution.process(session, message)) + if (message instanceof CreateFederationExecution) { - // there was no FederationExecution associated with this channel or - // the one associated could not createFederationExecution the message - - if (message instanceof CreateFederationExecution) - { - createFederationExecution( - session, (CreateFederationExecution) message); - } - else if (message instanceof DestroyFederationExecution) - { - destroyFederationExecution( - session, (DestroyFederationExecution) message); - } - else if (message instanceof JoinFederationExecution) - { - joinFederationExecution(session, (JoinFederationExecution) message); - } - else - { - assert false : String.format("unexpected message: %s", message); - } + createFederationExecution(session, (CreateFederationExecution) message); } + else if (message instanceof DestroyFederationExecution) + { + destroyFederationExecution( + session, (DestroyFederationExecution) message); + } + else if (message instanceof JoinFederationExecution) + { + joinFederationExecution(session, (JoinFederationExecution) message); + } + else + { + assert false : String.format("unexpected message: %s", message); + } } - - protected FederationExecution getFederationExecution(IoSession session) - { - return (FederationExecution) session.getAttribute(FEDERATION_EXECUTION); - } } protected class SocketAcceptorProfile Modified: trunk/rti/src/java/net/sf/ohla/rti1516/federate/Federate.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federate/Federate.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federate/Federate.java 2007-01-30 01:53:57 UTC (rev 96) @@ -16,12 +16,6 @@ package net.sf.ohla.rti1516.federate; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; - import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; @@ -50,35 +44,25 @@ import net.sf.ohla.rti1516.OHLAParameterHandleValueMapFactory; import net.sf.ohla.rti1516.OHLARegionHandleSetFactory; import net.sf.ohla.rti1516.fdd.FDD; -import net.sf.ohla.rti1516.federate.callbacks.Callback; -import net.sf.ohla.rti1516.federate.callbacks.CallbackManager; -import net.sf.ohla.rti1516.federate.callbacks.DiscoverObjectInstance; -import net.sf.ohla.rti1516.federate.callbacks.InitiateFederateRestore; -import net.sf.ohla.rti1516.federate.callbacks.InitiateFederateSave; -import net.sf.ohla.rti1516.federate.callbacks.ObjectInstanceNameReservationFailed; -import net.sf.ohla.rti1516.federate.callbacks.ObjectInstanceNameReservationSucceeded; -import net.sf.ohla.rti1516.federate.callbacks.ReceiveInteraction; -import net.sf.ohla.rti1516.federate.callbacks.ReflectAttributeValues; -import net.sf.ohla.rti1516.federate.callbacks.RemoveObjectInstance; -import net.sf.ohla.rti1516.federate.callbacks.SynchronizationPointRegistrationFailed; -import net.sf.ohla.rti1516.federate.callbacks.SynchronizationPointRegistrationSucceeded; -import net.sf.ohla.rti1516.federate.filter.InterestManagementFilter; +import net.sf.ohla.rti1516.messages.callbacks.Callback; +import net.sf.ohla.rti1516.messages.callbacks.CallbackManager; +import net.sf.ohla.rti1516.messages.callbacks.ObjectInstanceNameReservationFailed; +import net.sf.ohla.rti1516.messages.callbacks.ObjectInstanceNameReservationSucceeded; +import net.sf.ohla.rti1516.messages.callbacks.ReceiveInteraction; +import net.sf.ohla.rti1516.messages.callbacks.ReflectAttributeValues; +import net.sf.ohla.rti1516.messages.callbacks.RemoveObjectInstance; +import net.sf.ohla.rti1516.messages.callbacks.SynchronizationPointRegistrationFailed; +import net.sf.ohla.rti1516.messages.callbacks.SynchronizationPointRegistrationSucceeded; import net.sf.ohla.rti1516.federate.objects.ObjectManager; import net.sf.ohla.rti1516.federate.time.TimeManager; -import net.sf.ohla.rti1516.filter.RequestResponseFilter; -import net.sf.ohla.rti1516.messages.DefaultResponse; -import net.sf.ohla.rti1516.messages.FederateJoined; import net.sf.ohla.rti1516.messages.FederateRestoreComplete; import net.sf.ohla.rti1516.messages.FederateRestoreNotComplete; import net.sf.ohla.rti1516.messages.FederateSaveBegun; import net.sf.ohla.rti1516.messages.FederateSaveComplete; -import net.sf.ohla.rti1516.messages.FederateSaveInitiated; -import net.sf.ohla.rti1516.messages.FederateSaveInitiatedFailed; import net.sf.ohla.rti1516.messages.FederateSaveNotComplete; +import net.sf.ohla.rti1516.messages.GALTAdvanced; import net.sf.ohla.rti1516.messages.JoinFederationExecution; import net.sf.ohla.rti1516.messages.JoinFederationExecutionResponse; -import net.sf.ohla.rti1516.messages.Message; -import net.sf.ohla.rti1516.messages.ObjectInstanceNameReserved; import net.sf.ohla.rti1516.messages.QueryFederationRestoreStatus; import net.sf.ohla.rti1516.messages.QueryFederationSaveStatus; import net.sf.ohla.rti1516.messages.RegisterFederationSynchronizationPoint; @@ -86,21 +70,13 @@ import net.sf.ohla.rti1516.messages.RequestFederationRestore; import net.sf.ohla.rti1516.messages.RequestFederationSave; import net.sf.ohla.rti1516.messages.Retract; +import net.sf.ohla.rti1516.messages.SendInteraction; import net.sf.ohla.rti1516.messages.SubscribeObjectClassAttributes; import net.sf.ohla.rti1516.messages.SynchronizationPointAchieved; import net.sf.ohla.rti1516.messages.UnsubscribeObjectClassAttributes; -import net.sf.ohla.rti1516.messages.GALTAdvanced; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.LoggingFilter; -import org.apache.mina.filter.codec.ProtocolCodecFactory; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -242,11 +218,8 @@ public class Federate { - private static final String PEER_FEDERATE_HANDLE = "PeerFederateHandle"; + private static final Logger log = LoggerFactory.getLogger(Federate.class); - private static final Logger log = - LoggerFactory.getLogger(Federate.class); - public static final String OHLA_FEDERATE_HOST_PROPERTY = "ohla.federate.%s.host"; public static final String OHLA_FEDERATE_PORT_PROPERTY = @@ -328,21 +301,6 @@ */ protected IoSession rtiSession; - /** - * Handles communication with other federates (peers). - */ - protected PeerIoHandler peerIoHandler = new PeerIoHandler(); - - protected Lock peersLock = new ReentrantLock(true); - - /** - * Sessions connecting to other federates (peers). - */ - protected Map<FederateHandle, IoSession> peerSessions = - new HashMap<FederateHandle, IoSession>(); - - protected SocketAddress peerConnectionInfo; - public Federate(String federateType, String federationExecutionName, FederateAmbassador federateAmbassador, MobileFederateServices mobileFederateServices, @@ -356,12 +314,9 @@ this.mobileFederateServices = mobileFederateServices; this.rtiSession = rtiSession; - startPeerAcceptor(federateType); - JoinFederationExecution joinFederationExecution = new JoinFederationExecution( - federateType, federationExecutionName, mobileFederateServices, - peerConnectionInfo); + federateType, federationExecutionName, mobileFederateServices); WriteFuture writeFuture = rtiSession.write(joinFederationExecution); // TODO: set timeout @@ -389,72 +344,7 @@ LogicalTime galt = joinFederationExecutionResponse.getGALT(); timeManager = new TimeManager(this, mobileFederateServices, galt); - SocketConnector peerConnector = new SocketConnector(); - - peerConnector.setHandler(peerIoHandler); - - // TODO: selection of codec factory - // - ProtocolCodecFactory codec = new ObjectSerializationCodecFactory(); - - // handles messages to/from bytes - // - peerConnector.getFilterChain().addLast( - "ProtocolCodecFilter", new ProtocolCodecFilter(codec)); - - peerConnector.getFilterChain().addLast("LoggingFilter", - new LoggingFilter()); - - // handles request/response pairs - // - peerConnector.getFilterChain().addLast( - "RequestResponseFilter", new RequestResponseFilter()); - - // tracks peers interests and transforms messages to meet those - // requirements - // - peerConnector.getFilterChain().addLast( - "InterestManagementFilter", new InterestManagementFilter(this)); - - for (Map.Entry<FederateHandle, SocketAddress> entry : joinFederationExecutionResponse.getPeerConnectionInfo().entrySet()) - { - log.debug("connecting to peer: {}", entry.getValue()); - - // TODO: selection of local address to connect to peer? - // - ConnectFuture future = peerConnector.connect(entry.getValue()); - future.join(); - - IoSession peerSession = future.getSession(); - peerSession.setAttribute(PEER_FEDERATE_HANDLE, entry.getKey()); - - FederateJoined federateJoined = new FederateJoined(federateHandle); - - writeFuture = peerSession.write(federateJoined); - - // TODO: set timeout - // - writeFuture.join(); - - if (writeFuture.isWritten()) - { - // TODO: set timeout - // - federateJoined.await(); - - peersLock.lock(); - try - { - peerSessions.put(entry.getKey(), peerSession); - } - finally - { - peersLock.unlock(); - } - } - } - - log.info("joined federation execution; {}", federateHandle); + log.info("joined federation execution: {}", federateHandle); } else if (response instanceof FederationExecutionDoesNotExist) { @@ -564,99 +454,117 @@ } } - public void sendToPeers(Message message) - { - peersLock.lock(); - try - { - for (IoSession peerSession : peerSessions.values()) - { - peerSession.write(message); - } - } - finally - { - peersLock.unlock(); - } - } - public boolean process(IoSession session, Object message) { boolean processed = true; if (message instanceof Callback) { - if (message instanceof ReflectAttributeValues) + timeManager.getTimeLock().readLock().lock(); + try { - ReflectAttributeValues reflectAttributeValues = - (ReflectAttributeValues) message; + if (message instanceof ReflectAttributeValues) + { + ReflectAttributeValues reflectAttributeValues = + (ReflectAttributeValues) message; - objectManager.objectReflected( - reflectAttributeValues.getObjectInstanceHandle(), - reflectAttributeValues.getObjectClassHandle()); - } - else if (message instanceof DiscoverObjectInstance) - { - DiscoverObjectInstance discoverObjectInstance = - (DiscoverObjectInstance) message; + timeManager.getTimeLock().readLock().lock(); + try + { + OrderType receivedOrderType = + reflectAttributeValues.getSentOrderType() == OrderType.TIMESTAMP && + timeManager.isTimeConstrained() ? OrderType.TIMESTAMP : + OrderType.RECEIVE; + reflectAttributeValues.setReceivedOrderType(receivedOrderType); - String name = objectManager.createObjectInstanceName( - discoverObjectInstance.getObjectInstanceHandle(), - discoverObjectInstance.getObjectClassHandle()); - discoverObjectInstance.setName(name); - } - else if (message instanceof InitiateFederateSave) - { - InitiateFederateSave initiateFederateSave = - (InitiateFederateSave) message; + if (receivedOrderType == OrderType.RECEIVE) + { + // receive order callbacks need to be held until released if we + // are constrained and in the time granted state if asynchronous + // delivery is disabled + // + boolean hold = timeManager.isTimeConstrainedAndTimeGranted() && + !isAsynchronousDeliveryEnabled(); - federateStateLock.writeLock().lock(); - try - { - if (federateSave == null) + callbackManager.add(reflectAttributeValues, hold); + } + else + { + // schedule the callback for the appropriate time + // + Future future = schedule( + reflectAttributeValues.getUpdateTime(), + new AddCallback(reflectAttributeValues)); + + // register the message retraction handle + // + messageRetractionManager.add( + reflectAttributeValues.getUpdateTime(), future, + reflectAttributeValues.getMessageRetractionHandle()); + } + } + finally { - federateSave = new FederateSave( - federateHandle, federateType, - initiateFederateSave.getParticipants()); + timeManager.getTimeLock().readLock().unlock(); } } - finally + else if (message instanceof ReceiveInteraction) { - federateStateLock.writeLock().unlock(); - } - } - else if (message instanceof InitiateFederateRestore) - { - } + ReceiveInteraction receiveInteraction = (ReceiveInteraction) message; - timeManager.getTimeLock().readLock().lock(); - try - { - boolean hold = false; + timeManager.getTimeLock().readLock().lock(); + try + { + OrderType receivedOrderType = + receiveInteraction.getSentOrderType() == OrderType.TIMESTAMP && + timeManager.isTimeConstrained() ? OrderType.TIMESTAMP : + OrderType.RECEIVE; - if (message instanceof ReflectAttributeValues || - message instanceof ReceiveInteraction || - message instanceof RemoveObjectInstance) - { - hold = !isAsynchronousDeliveryEnabled() && - timeManager.isTimeConstrainedAndTimeGranted(); + receiveInteraction.setReceivedOrderType(receivedOrderType); + + if (receivedOrderType == OrderType.RECEIVE) + { + // receive order callbacks need to be held until released if we + // are constrained and in the time granted state, if asynchronous + // delivery is disabled + // + boolean hold = timeManager.isTimeConstrainedAndTimeGranted() && + !isAsynchronousDeliveryEnabled(); + + callbackManager.add(receiveInteraction, hold); + } + else + { + // schedule the callback for the appropriate time + // + Future future = schedule(receiveInteraction.getSendTime(), + new AddCallback(receiveInteraction)); + + // register the message retraction handle + // + messageRetractionManager.add( + receiveInteraction.getSendTime(), future, + receiveInteraction.getMessageRetractionHandle()); + } + } + finally + { + timeManager.getTimeLock().readLock().unlock(); + } } + else + { + boolean hold = message instanceof RemoveObjectInstance && + !isAsynchronousDeliveryEnabled() && + timeManager.isTimeConstrainedAndTimeGranted(); - callbackManager.add((Callback) message, hold); + callbackManager.add((Callback) message, hold); + } } finally { timeManager.getTimeLock().readLock().unlock(); } } - else if (message instanceof ObjectInstanceNameReserved) - { - ObjectInstanceNameReserved objectInstanceNameReserved = - (ObjectInstanceNameReserved) message; - - objectManager.objectInstanceNameReserved( - objectInstanceNameReserved.getName(), - objectInstanceNameReserved.getObjectInstanceHandle()); - } else if (message instanceof GALTAdvanced) { GALTAdvanced galtAdvanced = (GALTAdvanced) message; @@ -2626,8 +2534,18 @@ objectClassHandle, attributeRegionAssociation, passive); } - sendToPeers(new SubscribeObjectClassAttributes( - objectClassHandle, attributesAndRegions, passive)); + WriteFuture writeFuture = rtiSession.write( + new SubscribeObjectClassAttributes( + objectClassHandle, attributesAndRegions, passive)); + + // TODO: set timeout + // + writeFuture.join(); + + if (!writeFuture.isWritten()) + { + throw new RTIinternalError("error communicating with RTI"); + } } finally { @@ -2656,8 +2574,18 @@ objectClassHandle, attributeRegionAssociation); } - sendToPeers(new UnsubscribeObjectClassAttributes( - objectClassHandle, attributesAndRegions)); + WriteFuture writeFuture = rtiSession.write( + new UnsubscribeObjectClassAttributes( + objectClassHandle, attributesAndRegions)); + + // TODO: set timeout + // + writeFuture.join(); + + if (!writeFuture.isWritten()) + { + throw new RTIinternalError("error communicating with RTI"); + } } finally { @@ -2753,9 +2681,18 @@ // objectManager.checkIfInteractionClassPublished(interactionClassHandle); - sendToPeers(new ReceiveInteraction( + WriteFuture writeFuture = rtiSession.write(new SendInteraction( interactionClassHandle, parameterValues, tag, OrderType.RECEIVE, TransportationType.HLA_RELIABLE, regionHandles)); + + // TODO: set timeout + // + writeFuture.join(); + + if (!writeFuture.isWritten()) + { + throw new RTIinternalError("error communicating with RTI"); + } } finally { @@ -2794,11 +2731,20 @@ messageRetractionHandle = messageRetractionManager.add(sendTime); } - sendToPeers(new ReceiveInteraction( - interactionClassHandle, parameterValues, tag, sentOrderType, + WriteFuture writeFuture = rtiSession.write(new SendInteraction( + interactionClassHandle, parameterValues, tag, OrderType.RECEIVE, TransportationType.HLA_RELIABLE, sendTime, messageRetractionHandle, regionHandles)); + // TODO: set timeout + // + writeFuture.join(); + + if (!writeFuture.isWritten()) + { + throw new RTIinternalError("error communicating with RTI"); + } + return new MessageRetractionReturn(messageRetractionHandle != null, messageRetractionHandle); } @@ -3293,79 +3239,6 @@ } } - protected void startPeerAcceptor(String federateType) - throws RTIinternalError - { - if (peerConnectionInfo == null) - { - String host = System.getProperties().getProperty( - String.format(OHLA_FEDERATE_HOST_PROPERTY, federateType)); - String port = System.getProperties().getProperty( - String.format(OHLA_FEDERATE_PORT_PROPERTY, federateType)); - - try - { - SocketAcceptor peerAcceptor = new SocketAcceptor(); - peerAcceptor.setReuseAddress(true); - - peerAcceptor.setHandler(peerIoHandler); - - // TODO: selection of codec factory - // - ProtocolCodecFactory codec = new ObjectSerializationCodecFactory(); - - // handles messages to/from bytes - // - peerAcceptor.getFilterChain().addLast( - "ProtocolCodecFilter", new ProtocolCodecFilter(codec)); - - peerAcceptor.getFilterChain().addLast( - "LoggingFilter", new LoggingFilter()); - - // handles request/response pairs - // - peerAcceptor.getFilterChain().addLast( - "RequestResponseFilter", new RequestResponseFilter()); - - // tracks peers interests and transforms messages to meet those - // requirements - // - peerAcceptor.getFilterChain().addLast( - "InterestManagementFilter", new InterestManagementFilter(this)); - - peerConnectionInfo = - new InetSocketAddress( - host == null ? InetAddress.getLocalHost() : - InetAddress.getByName(host), - port == null ? 0 : Integer.parseInt(port)); - - log.info("binding to {}", peerConnectionInfo); - - peerAcceptor.setLocalAddress(peerConnectionInfo); - - peerAcceptor.bind(); - - peerConnectionInfo = peerAcceptor.getLocalAddress(); - - log.info("bound to {}", peerConnectionInfo); - } - catch (NumberFormatException nfe) - { - throw new RTIinternalError(String.format( - "invalid port: %s", port), nfe); - } - catch (UnknownHostException uhe) - { - throw new RTIinternalError(String.format( - "unknown host: %s", host), uhe); - } - catch (IOException ioe) - { - throw new RTIinternalError("unable to bind acceptor to: %s", ioe); - } - } - } - protected void checkIfSaveInProgress() throws SaveInProgress { @@ -3488,199 +3361,6 @@ } } - protected class PeerIoHandler - extends IoHandlerAdapter - { - public void sessionClosed(IoSession session) - throws Exception - { - peersLock.lock(); - try - { - throw new RuntimeException(); - } - finally - { - peersLock.unlock(); - } - } - - public void messageReceived(IoSession session, Object message) - throws Exception - { - FederateHandle peerFederateHandle = getPeerFederateHandle(session); - if (peerFederateHandle == null) - { - assert message instanceof FederateJoined : - String.format("unexpected message: %s", message); - - FederateJoined federateJoined = (FederateJoined) message; - - session.setAttribute(PEER_FEDERATE_HANDLE, - federateJoined.getFederateHandle()); - peersLock.lock(); - try - { - peerSessions.put(federateJoined.getFederateHandle(), session); - } - finally - { - peersLock.unlock(); - } - - objectManager.federateJoined(session); - - session.write(new DefaultResponse(federateJoined.getId())); - } - else if (message instanceof ReflectAttributeValues) - { - ReflectAttributeValues reflectAttributeValues = - (ReflectAttributeValues) message; - - timeManager.getTimeLock().readLock().lock(); - try - { - OrderType receivedOrderType = - reflectAttributeValues.getSentOrderType() == OrderType.TIMESTAMP && - timeManager.isTimeConstrained() ? OrderType.TIMESTAMP : - OrderType.RECEIVE; - reflectAttributeValues.setReceivedOrderType(receivedOrderType); - - if (receivedOrderType == OrderType.RECEIVE) - { - // receive order callbacks need to be held until released if we are - // constrained and in the time granted state if asynchronous delivery is - // disabled - // - boolean hold = timeManager.isTimeConstrainedAndTimeGranted() && - !isAsynchronousDeliveryEnabled(); - - callbackManager.add(reflectAttributeValues, hold); - } - else - { - // schedule the callback for the appropriate time - // - Future future = schedule( - reflectAttributeValues.getUpdateTime(), - new AddCallback(reflectAttributeValues)); - - // register the message retraction handle - // - messageRetractionManager.add( - reflectAttributeValues.getUpdateTime(), future, - reflectAttributeValues.getMessageRetractionHandle()); - } - } - finally - { - timeManager.getTimeLock().readLock().unlock(); - } - } - else if (message instanceof ReceiveInteraction) - { - ReceiveInteraction receiveInteraction = (ReceiveInteraction) message; - - timeManager.getTimeLock().readLock().lock(); - try - { - OrderType receivedOrderType = - receiveInteraction.getSentOrderType() == OrderType.TIMESTAMP && - timeManager.isTimeConstrained() ? OrderType.TIMESTAMP : - OrderType.RECEIVE; - - receiveInteraction.setReceivedOrderType(receivedOrderType); - - if (receivedOrderType == OrderType.RECEIVE) - { - // receive order callbacks need to be held until released if we are - // constrained and in the time granted state, if asynchronous delivery is - // disabled - // - boolean hold = timeManager.isTimeConstrainedAndTimeGranted() && - !isAsynchronousDeliveryEnabled(); - - callbackManager.add(receiveInteraction, hold); - } - else - { - // schedule the callback for the appropriate time - // - Future future = schedule(receiveInteraction.getSendTime(), - new AddCallback(receiveInteraction)); - - // register the message retraction handle - // - messageRetractionManager.add( - receiveInteraction.getSendTime(), future, - receiveInteraction.getMessageRetractionHandle()); - } - } - finally - { - timeManager.getTimeLock().readLock().unlock(); - } - } - else if (message instanceof FederateSaveInitiated) - { - FederateSaveInitiated federateSaveInitiated = - (FederateSaveInitiated) message; - federateStateLock.writeLock().lock(); - try - { - if (federateSave == null) - { - // handle the case where the message from the federation execution - // has not arrived yet - // - federateSave = new FederateSave( - federateHandle, federateType, - federateSaveInitiated.getParticipants()); - } - - federateSave.federateSaveInitiated(peerFederateHandle); - } - finally - { - federateStateLock.writeLock().unlock(); - } - } - else if (message instanceof FederateSaveInitiatedFailed) - { - FederateSaveInitiatedFailed federateSaveInitiatedFailed = - (FederateSaveInitiatedFailed) message; - federateStateLock.writeLock().lock(); - try - { - if (federateSave == null) - { - // handle the case where the message from the federation execution - // has not arrived yet - // - federateSave = new FederateSave( - federateHandle, federateType, - federateSaveInitiatedFailed.getParticipants()); - } - - federateSave.federateSaveInitiatedFailed(peerFederateHandle); - } - finally - { - federateStateLock.writeLock().unlock(); - } - } - else - { - assert false : String.format("unexpected message: %s", message); - } - } - - protected FederateHandle getPeerFederateHandle(IoSession session) - { - return (FederateHandle) session.getAttribute(PEER_FEDERATE_HANDLE); - } - } - protected class FederateAmbassadorInterceptor extends NullFederateAmbassador { @@ -3759,21 +3439,7 @@ try { federateAmbassador.initiateFederateSave(label); - - FederateSaveInitiated federateSaveInitiated = - new FederateSaveInitiated(federateSave.getParticipants()); - - rtiSession.write(federateSaveInitiated); - sendToPeers(federateSaveInitiated); } - catch (Throwable t) - { - FederateSaveInitiatedFailed federateSaveInitiatedFailed = - new FederateSaveInitiatedFailed(t, federateSave.getParticipants()); - - rtiSession.write(federateSaveInitiatedFailed); - sendToPeers(federateSaveInitiatedFailed); - } finally { federateState = FederateState.SAVE_IN_PROGRESS; @@ -3795,21 +3461,7 @@ try { federateAmbassador.initiateFederateSave(label, saveTime); - - FederateSaveInitiated federateSaveInitiated = - new FederateSaveInitiated(federateSave.getParticipants()); - - rtiSession.write(federateSaveInitiated); - sendToPeers(federateSaveInitiated); } - catch (Throwable t) - { - FederateSaveInitiatedFailed federateSaveInitiatedFailed = - new FederateSaveInitiatedFailed(t, federateSave.getParticipants()); - - rtiSession.write(federateSaveInitiatedFailed); - sendToPeers(federateSaveInitiatedFailed); - } finally { federateState = FederateState.SAVE_IN_PROGRESS; Modified: trunk/rti/src/java/net/sf/ohla/rti1516/federate/FederateSave.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federate/FederateSave.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federate/FederateSave.java 2007-01-30 01:53:57 UTC (rev 96) @@ -16,8 +16,6 @@ package net.sf.ohla.rti1516.federate; -import java.util.Set; - import hla.rti1516.FederateHandle; public class FederateSave @@ -25,14 +23,10 @@ protected FederateHandle federateHandle; protected String federateType; - protected transient Set<FederateHandle> participants; - - public FederateSave(FederateHandle federateHandle, String federateType, - Set<FederateHandle> participants) + public FederateSave(FederateHandle federateHandle, String federateType) { this.federateHandle = federateHandle; this.federateType = federateType; - this.participants = participants; } public FederateHandle getFederateHandle() @@ -44,27 +38,4 @@ { return federateType; } - - public Set<FederateHandle> getParticipants() - { - return participants; - } - - public boolean federateSaveInitiated(FederateHandle peerFederateHandle) - { - participants.remove(federateHandle); - return participants.isEmpty(); - } - - public boolean federateSaveInitiatedFailed(FederateHandle peerFederateHandle) - { - participants.remove(federateHandle); - return participants.isEmpty(); - } - - public boolean federateResigned(FederateHandle federateHandle) - { - participants.remove(federateHandle); - return participants.isEmpty(); - } } Modified: trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectInstance.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectInstance.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectInstance.java 2007-01-30 01:53:57 UTC (rev 96) @@ -29,7 +29,7 @@ import net.sf.ohla.rti1516.fdd.Attribute; import net.sf.ohla.rti1516.fdd.ObjectClass; import net.sf.ohla.rti1516.federate.Federate; -import net.sf.ohla.rti1516.federate.callbacks.ReflectAttributeValues; +import net.sf.ohla.rti1516.messages.callbacks.ReflectAttributeValues; import net.sf.ohla.rti1516.OHLAAttributeHandleSet; import net.sf.ohla.rti1516.OHLARegionHandleSet; import net.sf.ohla.rti1516.messages.AttributeOwnershipAcquisition; Modified: trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectManager.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectManager.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federate/objects/ObjectManager.java 2007-01-30 01:53:57 UTC (rev 96) @@ -18,10 +18,9 @@ import net.sf.ohla.rti1516.fdd.ObjectClass; import net.sf.ohla.rti1516.federate.Federate; import net.sf.ohla.rti1516.federate.SubscriptionManager; -import net.sf.ohla.rti1516.federate.callbacks.ReceiveInteraction; -import net.sf.ohla.rti1516.federate.callbacks.RemoveObjectInstance; +import net.sf.ohla.rti1516.messages.callbacks.ReceiveInteraction; +import net.sf.ohla.rti1516.messages.callbacks.RemoveObjectInstance; import net.sf.ohla.rti1516.messages.DefaultResponse; -import net.sf.ohla.rti1516.messages.ObjectInstanceRegistered; import net.sf.ohla.rti1516.messages.RegisterObjectInstance; import net.sf.ohla.rti1516.messages.ReserveObjectInstanceName; import net.sf.ohla.rti1516.messages.ResignFederationExecution; @@ -605,12 +604,9 @@ // Object response = registerObjectInstance.getResponse(); - assert response instanceof ObjectInstanceRegistered : - String.format("unexpected response: %s"); +// assert response instanceof ObjectInstanceRegistered : +// String.format("unexpected response: %s"); - ObjectInstanceRegistered objectInstanceRegistered = - (ObjectInstanceRegistered) response; - ObjectInstanceHandle objectInstanceHandle = objectInstanceRegistered.getObjectInstanceHandle(); String name = String.format("HLA-%s", objectInstanceHandle); @@ -704,12 +700,9 @@ // Object response = registerObjectInstance.getResponse(); - assert response instanceof ObjectInstanceRegistered : - String.format("unexpected response: %s"); +// assert response instanceof ObjectInstanceRegistered : +// String.format("unexpected response: %s"); - ObjectInstanceRegistered objectInstanceRegistered = - (ObjectInstanceRegistered) response; - ObjectInstanceHandle objectInstanceHandle = objectInstanceRegistered.getObjectInstanceHandle(); Modified: trunk/rti/src/java/net/sf/ohla/rti1516/federate/time/TimeManager.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federate/time/TimeManager.java 2007-01-30 01:52:49 UTC (rev 95) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federate/time/TimeManager.java 2007-01-30 01:53:57 UTC (rev 96) @@ -20,7 +20,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import net.sf.ohla.rti1516.federate.Federate; -import net.sf.ohla.rti1516.federate.callbacks.TimeAdvanceGrant; +import net.sf.ohla.rti1516.messages.callbacks.TimeAdvanceGrant; import net.sf.ohla.rti1516.messages.DisableTimeConstrained; import net.sf.ohla.rti1516.messages.DisableTimeRegulation; import net.sf.ohla.rti1516.messages.EnableTimeConstrained; Added: trunk/rti/src/java/net/sf/ohla/rti1516/federation/Federate.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federation/Federate.java (rev 0) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federation/Federate.java 2007-01-30 01:53:57 UTC (rev 96) @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2007, Michael Newcomb + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.sf.ohla.rti1516.federation; + +import net.sf.ohla.rti1516.messages.callbacks.InitiateFederateSave; +import net.sf.ohla.rti1516.messages.callbacks.FederationSaved; +import net.sf.ohla.rti1516.messages.callbacks.FederationNotSaved; +import net.sf.ohla.rti1516.messages.callbacks.DiscoverObjectInstance; +import net.sf.ohla.rti1516.messages.callbacks.ReflectAttributeValues; +import net.sf.ohla.rti1516.messages.callbacks.ReceiveInteraction; +import net.sf.ohla.rti1516.messages.callbacks.RemoveObjectInstance; +import net.sf.ohla.rti1516.messages.FederateSaveInitiated; +import net.sf.ohla.rti1516.messages.FederateSaveInitiatedFailed; +import net.sf.ohla.rti1516.messages.FederateSaveBegun; +import net.sf.ohla.rti1516.messages.FederateSaveComplete; +import net.sf.ohla.rti1516.messages.FederateSaveNotComplete; +import net.sf.ohla.rti1516.messages.FederateRestoreComplete; +import net.sf.ohla.rti1516.messages.FederateRestoreNotComplete; +import net.sf.ohla.rti1516.messages.RequestAttributeValueUpdate; +import net.sf.ohla.rti1516.messages.Retract; + +import org.apache.mina.common.IoSession; + +import hla.rti1516.FederateHandle; +import hla.rti1516.RestoreStatus; +import hla.rti1516.SaveStatus; + +public class Federate +{ + private static final String FEDERATE_IO_FILTER = "FederateIoFilter"; + + protected final FederateHandle federateHandle; + protected final String federateType; + protected final IoSession session; + protected final FederationExecution federationExecution; + + protected SaveStatus saveStatus = SaveStatus.NO_SAVE_IN_PROGRESS; + protected RestoreStatus restoreStatus = RestoreStatus.NO_RESTORE_IN_PROGRESS; + + public Federate(FederateHandle federateHandle, String federateType, + IoSession session, FederationExecution federationExecution) + { + this.federateHandle = federateHandle; + this.federateType = federateType; + this.session = session; + this.federationExecution = federationExecution; + + session.getFilterChain().addLast( + FEDERATE_IO_FILTER, new FederateIoFilter(this, federationExecution)); + } + + public FederateHandle getFederateHandle() + { + return federateHandle; + } + + public String getFederateType() + { + return federateType; + } + + public IoSession getSession() + { + return session; + } + + public SaveStatus getSaveStatus() + { + return saveStatus; + } + + public RestoreStatus getRestoreStatus() + { + return restoreStatus; + } + + public void resign() + { + session.getFilterChain().remove(FEDERATE_IO_FILTER); + } + + public void initiateFederateSave(InitiateFederateSave initiateFederateSave) + { + saveStatus = SaveStatus.FEDERATE_INSTRUCTED_TO_SAVE; + + session.write(initiateFederateSave); + } + + public void federateSaveInitiated(FederateSaveInitiated federateSaveInitiated) + { + } + + public void federateSaveInitiatedFailed( + FederateSaveInitiatedFailed federateSaveInitiatedFailed) + { + saveStatus = SaveStatus.NO_SAVE_IN_PROGRESS; + } + + public void federateSaveBegun(FederateSaveBegun federateSaveBegun) + { + saveStatus = SaveStatus.FEDERATE_SAVING; + } + + public void federateSaveComplete(FederateSaveComplete federateSaveComplete) + { + saveStatus = SaveStatus.FEDERATE_WAITING_FOR_FEDERATION_TO_SAVE; + } + + public void federateSaveNotComplete( + FederateSaveNotComplete federateSaveNotComplete) + { + saveStatus = SaveStatus.FEDERATE_WAITING_FOR_FEDERATION_TO_SAVE; + } + + public void federationSaved(FederationSaved federationSaved) + { + saveStatus = SaveStatus.NO_SAVE_IN_PROGRESS; + + session.write(federationSaved); + } + + public void federationNotSaved(FederationNotSaved federationNotSaved) + { + saveStatus = SaveStatus.NO_SAVE_IN_PROGRESS; + + session.write(federationNotSaved); + } + + public void federateRestoreComplete( + FederateRestoreComplete federateRestoreComplete) + { + restoreStatus = RestoreStatus.NO_RESTORE_IN_PROGRESS; + } + + public void federateRestoreNotComplete( + FederateRestoreNotComplete federateRestoreNotComplete) + { + restoreStatus = RestoreStatus.NO_RESTORE_IN_PROGRESS; + } + + public void discoverObjectInstance( + DiscoverObjectInstance discoverObjectInstance) + { + session.write(discoverObjectInstance); + } + + public void reflectAttributeValues( + ReflectAttributeValues reflectAttributeValues) + { + session.write(reflectAttributeValues); + } + + public void receiveInteraction(ReceiveInteraction receiveInteraction) + { + session.write(receiveInteraction); + } + + public void removeObjectInstance(RemoveObjectInstance removeObjectInstance) + { + session.write(removeObjectInstance); + } + + public void requestAttributeValueUpdate( + RequestAttributeValueUpdate requestAttributeValueUpdate) + { + session.write(requestAttributeValueUpdate); + } + + public void retract(Retract retract) + { + session.write(retract); + } + + @Override + public int hashCode() + { + return federateHandle.hashCode(); + } + + @Override + public boolean equals(Object rhs) + { + return federateHandle.equals(rhs); + } + + @Override + public String toString() + { + return String.format("%s - %s - %s", federateHandle, + session.getLocalAddress(), federateType); + } +} Added: trunk/rti/src/java/net/sf/ohla/rti1516/federation/FederateIoFilter.java =================================================================== --- trunk/rti/src/java/net/sf/ohla/rti1516/federation/FederateIoFilter.java (rev 0) +++ trunk/rti/src/java/net/sf/ohla/rti1516/federation/FederateIoFilter.java 2007-01-30 01:53:57 UTC (rev 96) @@ -0,0 +1,517 @@ +/* + * Copyright (c) 2007, Michael Newcomb + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.sf.ohla.rti1516.federation; + +import java.util.Map; + +import net.sf.ohla.rti1516.OHLAAttributeHandleValueMap; +import net.sf.ohla.rti1516.OHLAParameterHandleValueMap; +import net.sf.ohla.rti1516.fdd.InteractionClass; +import net.sf.ohla.rti1516.fdd.ObjectClass; +import net.sf.ohla.rti1516.federate.SubscriptionManager; +import net.sf.ohla.rti1516.messages.callbacks.ReceiveInteraction; +import net.sf.ohla.rti1516.messages.callbacks.ReflectAttributeValues; +import net.sf.ohla.rti1516.messages.AttributeOwnershipAcquisition; +import net.sf.ohla.rti1516.messages.AttributeOwnershipAcquisitionIfAvailable; +import net.sf.ohla.rti1516.messages.AttributeOwnershipDivestitureIfWanted; +import net.sf.ohla.rti1516.messages.CancelAttributeOwnershipAcquisition; +import net.sf.ohla.rti1516.messages.CancelNegotiatedAttributeOwnershipDivestiture; +import net.sf.ohla.rti1516.messages.CommitRegionModifications; +import net.sf.ohla.rti1516.messages.ConfirmDivestiture; +import net.sf.ohla.rti1516.messages.CreateRegion; +import net.sf.ohla.rti1516.messages.DeleteRegion; +import net.sf.ohla.rti1516.messages.DisableTimeConstrained; +import net.sf.ohla.rti1516.messages.DisableTimeRegulation; +import net.sf.ohla.rti1516.messages.EnableTimeConstrained; +import net.sf.ohla.rti1516.messages.EnableTimeRegulation; +import net.sf.ohla.rti1516.messages.FederateRestoreComplete; +import net.sf.ohla.rti1516.messages.FederateRestoreNotComplete; +import net.sf.ohla.rti1516.messages.FederateSaveBegun; +import net.sf.ohla.rti1516.messages.FederateSaveComplete; +import net.sf.ohla.rti1516.messages.FederateSaveInitiated; +import net.sf.ohla.rti1516.messages.FederateSaveInitiatedFailed; +import net.sf.ohla.rti1516.messages.FederateSaveNotComplete; +import net.sf.ohla.rti1516.messages.GetRangeBounds; +import net.sf.ohla.rti1516.messages.NegotiatedAttributeOwnershipDivestiture; +import net.sf.ohla.rti1516.messages.QueryAttributeOwnership; +import net.sf.ohla.rti1516.messages.QueryFederationRestoreStatus; +import net.sf.ohla.rti1516.messages.QueryFederationSaveStatus; +import net.sf.ohla.rti1516.messages.RegisterFederationSynchronizationPoint; +import net.sf.ohla.rti1516.messages.RegisterObjectInstance; +import net.sf.ohla.rti1516.messages.RequestAttributeValueUpdate; +import net.sf.ohla.rti1516.messages.RequestFederationRestore; +import net.sf.ohla.rti1516.messages.RequestFederationSave; +import net.sf.ohla.rti1516.messages.ReserveObjectInstanceName; +import net.sf.ohla.rti1516.messages.ResignFederationExecution; +import net.sf.ohla.rti1516.messages.Retract; +import net.sf.ohla.rti1516.messages.SubscribeInteractionClass; +import net.sf.ohla.rti1516.messages.SubscribeObjectClassAttributes; +import net.sf.ohla.rti1516.messages.SynchronizationPointAchieved; +import net.sf.ohla.rti1516.messages.TimeAdvanceRequest; +import net.sf.ohla.rti1516.messages.TimeAdvanceRequestAvailable; +import net.sf.ohla.rti1516.messages.UnconditionalAttributeOwnershipDivestiture; +import net.sf.ohla.rti1516.messages.UnsubscribeInteractionClass; +import net.sf.ohla.rti1516.messages.UnsubscribeObjectClassAttributes; + +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoSession; + +import hla.rti1516.AttributeHandle; +import hla.rti1516.AttributeHandleValueMap; +import hla.rti1516.InteractionClassHandle; +import hla.rti1516.ObjectClassHandle; +import hla.rti1516.ObjectInstanceHandle; +import hla.rti1516.ParameterHandleValueMap; + +public class FederateIoFilter + extends IoFilterAdapter +{ + protected final Federate federate; + protected final FederationExecution federationExecution; + + protected final FederateIoFilterSubscriptionManager subscriptionManager = + new FederateIoFilterSubscriptionManager(); + + public FederateIoFilter(Federate federate, + FederationExecution federationExecution) + { + this.federate = federate; + this.federationExecution = federationExecution; + } + + @Override + public void sessionClosed(NextFilter nextFilter, IoSession session) + throws Exception + { + super.sessionClosed(nextFilter, session); + } + + @Override + public void messageReceived(NextFilter nextFilter, IoSession session, + Object message) + throws Exception + { + if (message instanceof RegisterObjectInstance) + { + federationExecution.registerObjectInstance( + federate, (RegisterObjectInstance) message); + } + else if (message instanceof ReserveObjectInstanceName) + { + federationExecution.reserveObjectInstanceName( + federate, (ReserveObjectInstanceName) message); + } + else if (message instanceof DeleteObjectInstance) + { + federationExecution.deleteObjectInstance( + federate, (DeleteObjectInstance) message); + } + else if (message instanceof RequestAttributeValueUpdate) + { + federationExecution.requestAttributeValueUpdate( + federate, (RequestAttributeValueUpdate) message); + } + else if (message instanceof Retract) + { + federationExecution.retract(federate, (Retract) message); + } + else if (message instanceof SubscribeObjectClassAttributes) + { + SubscribeObjectClassAttributes subscribeObjectClassAttributes = + (SubscribeObjectClassAttributes) message; + + if (subscribeObjectClassAttributes.getAttributeHandles() != null) + { + subscriptionManager.subscribeObjectClassAttributes( + subscribeObjectClassAttributes.getObjectClassHandle(), + subscribeObjectClassAttributes.getAttributeHandles(), + subscribeObjectClassAttributes.isPassive()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + else if (subscribeObjectClassAttributes.getAttributesAndRegions() != null) + { + subscriptionManager.subscribeObjectClassAttributes( + subscribeObjectClassAttributes.getObjectClassHandle(), + subscribeObjectClassAttributes.getAttributesAndRegions(), + subscribeObjectClassAttributes.isPassive()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + } + else if (message instanceof UnsubscribeObjectClassAttributes) + { + UnsubscribeObjectClassAttributes unsubscribeObjectClassAttributes = + (UnsubscribeObjectClassAttributes) message; + + if (unsubscribeObjectClassAttributes.getAttributeHandles() != null) + { + subscriptionManager.unsubscribeObjectClassAttributes( + unsubscribeObjectClassAttributes.getObjectClassHandle(), + unsubscribeObjectClassAttributes.getAttributeHandles()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + else + if (unsubscribeObjectClassAttributes.getAttributesAndRegions() != null) + { + subscriptionManager.unsubscribeObjectClassAttributes( + unsubscribeObjectClassAttributes.getObjectClassHandle(), + unsubscribeObjectClassAttributes.getAttributesAndRegions()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + } + else if (message instanceof SubscribeInteractionClass) + { + SubscribeInteractionClass subscribeInteractionClass = + (SubscribeInteractionClass) message; + + if (subscribeInteractionClass.getRegionHandles() == null) + { + subscriptionManager.subscribeInteractionClass( + subscribeInteractionClass.getInteractionClassHandle(), + subscribeInteractionClass.isPassive()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + else + { + subscriptionManager.subscribeInteractionClass( + subscribeInteractionClass.getInteractionClassHandle(), + subscribeInteractionClass.getRegionHandles(), + subscribeInteractionClass.isPassive()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + } + else if (message instanceof UnsubscribeInteractionClass) + { + UnsubscribeInteractionClass unsubscribeInteractionClass = + (UnsubscribeInteractionClass) message; + + if (unsubscribeInteractionClass.getRegionHandles() == null) + { + subscriptionManager.unsubscribeInteractionClass( + unsubscribeInteractionClass.getInteractionClassHandle()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + else + { + subscriptionManager.unsubscribeInteractionClass( + unsubscribeInteractionClass.getInteractionClassHandle(), + unsubscribeInteractionClass.getRegionHandles()); + + // TODO: notify the PublicationManager that subsciption interests have changed + } + } + else if (message instanceof RegisterFederationSynchronizationPoint) + { + federationExecution.registerFederationSynchronizationPoint( + federate, (RegisterFederationSynchronizationPoint) message); + } + else if (message instanceof SynchronizationPointAchieved) + { + federationExecution.synchronizationPointAchieved( + federate, (SynchronizationPointAchieved) message); + } + else if (message instanceof RequestFederationSave) + { + federationExecution.requestFederationSave( + federate, (RequestFederationSave) message); + } + else if (message instanceof FederateSaveInitiated) + { + federationExecution.federateSaveInitiated( + federate, (FederateSaveInitiated) message); + } + else if (message instanceof FederateSaveInitiatedFailed) + { + federationExecution.federateSaveInitiatedFailed( + federate, (FederateSaveInitiatedFailed) message); + } + else if (message instanceof FederateSaveBegun) + { + federationExecution.federateSaveBegun( + federate, (FederateSaveBegun)... [truncated message content] |