Thread: [Ubermq-commits] uberchord/src/com/ubermq/chord/jms ChordMessage.java,1.1.1.1,1.2 ChordQueryMessage.
Brought to you by:
jimmyp
Update of /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms In directory usw-pr-cvs1:/tmp/cvs-serv10687/src/com/ubermq/chord/jms Modified Files: ChordMessage.java ChordQueryMessage.java ChordValueMessage.java LocalChordNode.java RemoteChordNode.java Log Message: chord modifications Index: ChordMessage.java =================================================================== RCS file: /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms/ChordMessage.java,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -d -r1.1.1.1 -r1.2 *** ChordMessage.java 8 Nov 2002 22:36:36 -0000 1.1.1.1 --- ChordMessage.java 10 Nov 2002 22:40:43 -0000 1.2 *************** *** 146,152 **** * @throws JMSException if the message cannot be created */ ! public static ChordMessage createQueryMessage(TopicSession session, ! String replyTopic, ! Object key) throws JMSException { --- 146,152 ---- * @throws JMSException if the message cannot be created */ ! public static ChordQueryMessage createQueryMessage(Session session, ! Topic replyTopic, ! Object key) throws JMSException { *************** *** 167,175 **** */ public static ChordMessage createValueMessage(Session session, Object key, Object value) throws JMSException { ! return new ChordValueMessage(session, key, value); } --- 167,176 ---- */ public static ChordMessage createValueMessage(Session session, + long queryId, Object key, Object value) throws JMSException { ! return new ChordValueMessage(session, queryId, key, value); } Index: ChordQueryMessage.java =================================================================== RCS file: /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms/ChordQueryMessage.java,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -d -r1.1.1.1 -r1.2 *** ChordQueryMessage.java 8 Nov 2002 22:36:36 -0000 1.1.1.1 --- ChordQueryMessage.java 10 Nov 2002 22:40:43 -0000 1.2 *************** *** 12,18 **** private Object key; private Topic replyTopic; /** ! * Constructs a chord store message. * @throws IllegalArgumentException if the message is invalid. */ --- 12,21 ---- private Object key; private Topic replyTopic; + private long queryId; + + public static final String CHORD_QUERY_ID_PROPERTY = "query-id"; /** ! * Constructs a chord query message. * @throws IllegalArgumentException if the message is invalid. */ *************** *** 25,28 **** --- 28,32 ---- this.key = ((ObjectMessage)m).getObject(); this.replyTopic = (Topic)m.getJMSReplyTo(); + this.queryId = m.getLongProperty(CHORD_QUERY_ID_PROPERTY); } catch (JMSException e) { *************** *** 39,44 **** * */ ! ChordQueryMessage(TopicSession session, ! String replyTopic, Object key) throws JMSException --- 43,48 ---- * */ ! ChordQueryMessage(Session session, ! Topic replyTopic, Object key) throws JMSException *************** *** 46,53 **** super(session.createObjectMessage(), CHORD_QUERY); ObjectMessage m = (ObjectMessage)getJMSMessage(); m.setObject((java.io.Serializable)key); ! m.setJMSReplyTo(session.createTopic(replyTopic)); } --- 50,62 ---- super(session.createObjectMessage(), CHORD_QUERY); + + this.key = key; + this.replyTopic = replyTopic; + this.queryId = com.ubermq.Utility.allocateLocallyUniqueLong(); ObjectMessage m = (ObjectMessage)getJMSMessage(); m.setObject((java.io.Serializable)key); ! m.setLongProperty(CHORD_QUERY_ID_PROPERTY, queryId); ! m.setJMSReplyTo(replyTopic); } *************** *** 60,63 **** --- 69,77 ---- return key; } + + public long getQueryId() + { + return queryId; + } public void execute(TopicSession session, *************** *** 73,81 **** { ChordMessage reply = ChordMessage.createValueMessage(session, key, value); pub.publish(replyTopic, reply.getJMSMessage()); - - com.ubermq.Utility.getLogger().info("sent query response to " + replyTopic + " for " + key); } catch (ClassCastException e) { --- 87,95 ---- { ChordMessage reply = ChordMessage.createValueMessage(session, + queryId, key, value); + com.ubermq.Utility.getLogger().info(p.identifier() + " sent query response to " + replyTopic + " for " + queryId); pub.publish(replyTopic, reply.getJMSMessage()); } catch (ClassCastException e) { Index: ChordValueMessage.java =================================================================== RCS file: /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms/ChordValueMessage.java,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -d -r1.1.1.1 -r1.2 *** ChordValueMessage.java 8 Nov 2002 22:36:36 -0000 1.1.1.1 --- ChordValueMessage.java 10 Nov 2002 22:40:43 -0000 1.2 *************** *** 11,14 **** --- 11,15 ---- { private Object key, value; + private long queryId; /** *************** *** 27,30 **** --- 28,33 ---- this.value = l.get(1); else this.value = null; + + this.queryId = m.getLongProperty(ChordQueryMessage.CHORD_QUERY_ID_PROPERTY); } catch (Exception e) { *************** *** 41,44 **** --- 44,48 ---- */ ChordValueMessage(Session session, + long queryId, Object key, Object value) *************** *** 54,57 **** --- 58,62 ---- ObjectMessage m = (ObjectMessage)getJMSMessage(); m.setObject((java.io.Serializable)pair); + m.setLongProperty(ChordQueryMessage.CHORD_QUERY_ID_PROPERTY, queryId); } *************** *** 63,66 **** --- 68,81 ---- { return key; + } + + /** + * Returns the query identifier that this value + * is in response to. + * @return the query id + */ + public long getQueryId() + { + return queryId; } Index: LocalChordNode.java =================================================================== RCS file: /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms/LocalChordNode.java,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -d -r1.1.1.1 -r1.2 *** LocalChordNode.java 8 Nov 2002 22:36:36 -0000 1.1.1.1 --- LocalChordNode.java 10 Nov 2002 22:40:43 -0000 1.2 *************** *** 23,35 **** MessageListener { ! private MessageServer ms; ! private Map data; ! private TopicConnection tc; ! private TopicSession ts; ! private TopicSubscriber tsub; ! private TopicSubscriber announceSub; ! private TopicPublisher tpub; /** --- 23,36 ---- MessageListener { ! private final MessageServer ms; ! private final URI serviceURI; ! private final Map data; ! private final TopicConnection tc; ! private final TopicSession ts; ! private final TopicSubscriber tsub; ! private final TopicSubscriber announceSub; ! private final TopicPublisher tpub; /** *************** *** 160,163 **** --- 161,165 ---- this.ms = ms; this.data = new HashMap(); + this.serviceURI = getServiceURI(ms.getServiceURI(), id); // check if anyone else has this ID *************** *** 178,194 **** public void onMessage(Message p0) { ! com.ubermq.Utility.getLogger().fine("got message on locator topic."); try { assert p0.getJMSReplyTo() != null; ! synchronized(tpub) ! { ! Message m = ts.createMessage(); ! m.setStringProperty(CHORD_NODE_LOCATOR_URI_PROP, ! getServiceURI().toString()); ! tpub.publish((Topic)p0.getJMSReplyTo(), m); ! com.ubermq.Utility.getLogger().fine("replied with my uri: " + getServiceURI()); ! } } catch (JMSException e) { --- 180,194 ---- public void onMessage(Message p0) { ! com.ubermq.Utility.getLogger().info("got message on locator topic."); try { assert p0.getJMSReplyTo() != null; ! ! Message m = ts.createMessage(); ! m.setStringProperty(CHORD_NODE_LOCATOR_URI_PROP, ! getServiceURI().toString()); ! tpub.publish((Topic)p0.getJMSReplyTo(), m); ! com.ubermq.Utility.getLogger().info("replied with my uri: " + getServiceURI()); } catch (JMSException e) { *************** *** 249,253 **** public URI getServiceURI() { ! return getServiceURI(ms.getServiceURI(), identifier()); } --- 249,253 ---- public URI getServiceURI() { ! return serviceURI; } *************** *** 336,345 **** try { - synchronized(tpub) - { m.execute(ts, tpub, this); - } } catch (JMSException e) { --- 336,342 ---- *************** *** 366,369 **** --- 363,367 ---- MessageServer ms = new MessageServer(p); ms.run(); + System.out.println(ms.getServiceURI()); // start chord node Index: RemoteChordNode.java =================================================================== RCS file: /cvsroot/ubermq/uberchord/src/com/ubermq/chord/jms/RemoteChordNode.java,v retrieving revision 1.1.1.1 retrieving revision 1.2 diff -C2 -d -r1.1.1.1 -r1.2 *** RemoteChordNode.java 8 Nov 2002 22:36:36 -0000 1.1.1.1 --- RemoteChordNode.java 10 Nov 2002 22:40:43 -0000 1.2 *************** *** 1,6 **** --- 1,8 ---- package com.ubermq.chord.jms; + import com.ubermq.*; import com.ubermq.chord.*; import com.ubermq.jms.client.*; + import EDU.oswego.cs.dl.util.concurrent.*; import java.io.*; import java.util.*; *************** *** 12,18 **** * JMS and the ChordMessage object hierarchy for connectivity.<P> * - * This object is not thread-safe. It must not be used for simultaneous - * queries by multiple threads without added synchronization. - * * This object may be serialized across the network as a way to describe * chord nodes to other chord participants.<P> --- 14,17 ---- *************** *** 34,41 **** --- 33,45 ---- // Transient, mutable state private transient boolean connected = false; + + private transient Mutex queryMutex; private transient TopicConnection tc; private transient TopicSession ts; private transient TopicPublisher tpub; + + private transient Topic replyTopic; + private transient TopicSubscriber replySub; // a big map of URI -> remote nodes *************** *** 173,176 **** --- 177,181 ---- } + this.queryMutex = new Mutex(); this.commandTopic = LocalChordNode.getCommandTopic(identifier); this.connected = false; *************** *** 201,204 **** --- 206,212 ---- ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + replyTopic = ts.createTemporaryTopic(); + replySub = ts.createSubscriber(replyTopic); + tpub = ts.createPublisher(ts.createTopic(commandTopic)); *************** *** 390,417 **** * Queries the node for the object indexed at the specified key. * - * @throws ItemNotFoundException if there is no object at this node - * for the given key. */ ! public Object query(Object key) { ensureConnected(); - TopicSubscriber tempSub = null; try { ! Topic tt = ts.createTemporaryTopic(); ! tempSub = ts.createSubscriber(tt); ! ! ChordMessage cm = ChordMessage.createQueryMessage(ts, tt.getTopicName(), key); ! // publish the query & wait for the reply, synchronously. assert isConnected(); synchronized(tpub) { tpub.publish(cm.getJMSMessage()); } // get the reply ! Object retrievedValue = null; ! Message jmsMessage = tempSub.receive(DEFAULT_QUERY_TIMEOUT); if (jmsMessage != null) { --- 398,422 ---- * Queries the node for the object indexed at the specified key. * */ ! public synchronized Object query(Object key) { ensureConnected(); try { ! ChordQueryMessage cm = ChordMessage.createQueryMessage(ts, replyTopic, key); ! ! // get the query mutex ! queryMutex.acquire(); ! // publish the query & wait for the reply, synchronously. assert isConnected(); synchronized(tpub) { + Utility.getLogger().info("query " + cm.getQueryId() + " at " + identifier() + " for " + key + ". expecting response on " + replyTopic); tpub.publish(cm.getJMSMessage()); } // get the reply ! Message jmsMessage = replySub.receive(DEFAULT_QUERY_TIMEOUT); if (jmsMessage != null) { *************** *** 420,431 **** ChordValueMessage v = ((ChordValueMessage)m); ! if (v.getKey().equals(key)) { ! // we have a reply. our model is for a single ! // outstanding query per object at a time. ! return v.getValue(); ! } else { ! com.ubermq.Utility.getLogger().severe("expected result on " + tempSub + " for " + key + " but was " + v.getKey()); ! assert false; ! } } --- 425,435 ---- ChordValueMessage v = ((ChordValueMessage)m); ! assert v.getQueryId() == cm.getQueryId() : "expected result on " + replyTopic + " for " + cm.getQueryId() + " but was " + v.getQueryId(); ! ! // we have a reply. our model is for a single ! // outstanding query per object at a time. ! Utility.getLogger().info("got response on " + replyTopic + " for " + cm.getQueryId()); ! assert replySub.receiveNoWait() == null : "extra reply message"; ! return v.getValue(); } *************** *** 433,448 **** //TODO: handle the failure of the destination node more // gracefully ! throw new java.lang.IllegalStateException("Destination node did not reply to " + tempSub); } catch (JMSException e) { throw new java.lang.IllegalStateException("Destination node is unavailable."); } finally { ! try ! { ! if (tempSub != null) ! tempSub.close(); ! } ! catch (JMSException e) {} } } --- 437,451 ---- //TODO: handle the failure of the destination node more // gracefully ! assert false; ! return null; } catch (JMSException e) { throw new java.lang.IllegalStateException("Destination node is unavailable."); } + catch (InterruptedException ie) { + throw new UnknownError("Interrupted while acquiring query mutex."); + } finally { ! queryMutex.release(); } } |