[Ubermq-commits] jms/src/com/ubermq/jms/server/datagram/impl DatagramTestCase.java,NONE,1.1 AckDatag
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-09-19 21:23:37
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl In directory usw-pr-cvs1:/tmp/cvs-serv32487/src/com/ubermq/jms/server/datagram/impl Modified Files: AckDatagram.java MessageDatagram.java Added Files: DatagramTestCase.java Log Message: some new regression cases and small refactorings --- NEW FILE: DatagramTestCase.java --- package com.ubermq.jms.server.datagram.impl; import com.ubermq.jms.server.datagram.*; import junit.framework.*; import com.ubermq.kernel.IDatagram; import java.nio.ByteBuffer; import java.util.Arrays; public class DatagramTestCase extends TestCase { public static TestSuite suite() { return new TestSuite(DatagramTestCase.class); } public DatagramTestCase(String sz) { super(sz); } private ByteBuffer theBuffer; private byte[] someBytes, moreBytes; public void setUp() { theBuffer = ByteBuffer.allocateDirect(100000); someBytes = new byte[512]; moreBytes = new byte[32768]; Arrays.fill(someBytes, (byte)0xcc); Arrays.fill(moreBytes, (byte)0xff); } public void tearDown() { theBuffer = null; } private IDatagram inAndOut(IDatagram d, ByteBuffer bb) throws Exception { bb.clear(); d.outgoing(bb); bb.flip(); IDatagram out = ((IDatagram)d.getClass().newInstance()); out.incoming(bb); return out; } public void theBasics(IDatagram d) throws Exception { d.setDatagramFlagBits(0xeaeaeaea); Assert.assertEquals(d.getDatagramFlags(), 0xeaeaeaea); d.unsetDatagramFlagBits(0xeaeaeaea); Assert.assertEquals(d.getDatagramFlags(), 0); } public void testAck() throws Exception { IAckDatagram ad = new AckDatagram(true); theBasics(ad); Assert.assertTrue(ad.isNegativeAck()); Assert.assertTrue(ad.getAckMessageId() == null); ad = (IAckDatagram)inAndOut(ad, theBuffer); Assert.assertTrue(ad.isNegativeAck()); Assert.assertTrue(ad.getAckMessageId() == null); ad = new AckDatagram(new MessageId(5, 6), false); Assert.assertTrue(!ad.isNegativeAck()); Assert.assertEquals(ad.getAckMessageId().getSenderId(), 5); Assert.assertEquals(ad.getAckMessageId().getSequence(), 6); ad = (IAckDatagram)inAndOut(ad, theBuffer); Assert.assertTrue(!ad.isNegativeAck()); Assert.assertEquals(ad.getAckMessageId().getSenderId(), 5); Assert.assertEquals(ad.getAckMessageId().getSequence(), 6); } public void testControl() { } public void testMessage() throws Exception { IMessageDatagram md = new MessageDatagram("a topic"); theBasics(md); Assert.assertEquals(md.getTopicName(), "a topic"); md = new MessageDatagram(); md.setTopicName("the topic"); Assert.assertEquals(md.getTopicName(), "the topic"); // set some properties md.setSenderId(900); md.setSequence(1); Assert.assertEquals(md.getSenderId(), 900); Assert.assertEquals(md.getSequence(), 1); // set some standard props md.setStandardProperty(IMessageDatagram.STDPROP_CORRELATIONID, someBytes); md.setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer(3)); md.setStandardProperty(IMessageDatagram.STDPROP_MSGTYPE, new Integer(5)); md.setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer(8)); md.setStandardProperty(IMessageDatagram.STDPROP_REDELIVERY, Boolean.TRUE); md.setStandardProperty(IMessageDatagram.STDPROP_REPLYTO, "ReplyTo"); md.setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long(42L)); md.setStandardProperty(IMessageDatagram.STDPROP_TTL, new Integer(1000)); md.setStandardProperty(IMessageDatagram.STDPROP_BODY, moreBytes); // custom props md.setCustomProperty("property1", someBytes); md.setCustomProperty("property2", "Hello"); // get them & verify Assert.assertTrue(Arrays.equals((byte[])md.getStandardProperty(IMessageDatagram.STDPROP_CORRELATIONID), someBytes)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE), new Integer(3)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_MSGTYPE), new Integer(5)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_PRIORITY), new Integer(8)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_REDELIVERY), Boolean.TRUE); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_REPLYTO), "ReplyTo"); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP), new Long(42L)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_TTL), new Integer(1000)); Assert.assertTrue(Arrays.equals((byte[])md.getStandardProperty(IMessageDatagram.STDPROP_BODY), moreBytes)); // set a few custom props Assert.assertEquals(md.getCustomProperty("property1"), someBytes); Assert.assertEquals(md.getCustomProperty("property2"), "Hello"); // send to disk, and bring back, and redo the tests. md = (IMessageDatagram)inAndOut(md, theBuffer); // get them & verify Assert.assertTrue(Arrays.equals((byte[])md.getStandardProperty(IMessageDatagram.STDPROP_CORRELATIONID), someBytes)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE), new Integer(3)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_MSGTYPE), new Integer(5)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_PRIORITY), new Integer(8)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_REDELIVERY), Boolean.TRUE); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_REPLYTO), "ReplyTo"); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP), new Long(42L)); Assert.assertEquals(md.getStandardProperty(IMessageDatagram.STDPROP_TTL), new Integer(1000)); Assert.assertTrue(Arrays.equals((byte[])md.getStandardProperty(IMessageDatagram.STDPROP_BODY), moreBytes)); // set a few custom props Assert.assertTrue(Arrays.equals((byte[])md.getCustomProperty("property1"), someBytes)); Assert.assertEquals(md.getCustomProperty("property2"), "Hello"); } } Index: AckDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl/AckDatagram.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AckDatagram.java 19 Sep 2002 17:30:32 -0000 1.6 --- AckDatagram.java 19 Sep 2002 21:23:33 -0000 1.7 *************** *** 13,17 **** implements IAckDatagram { ! private IMessageDatagram.MessageId msgId; private boolean nack; --- 13,17 ---- implements IAckDatagram { ! private MessageId msgId; private boolean nack; *************** *** 23,28 **** public AckDatagram(boolean nack) { ! super(DatagramFactory.DGRAM_ACK, 0); ! this.msgId = new IMessageDatagram.MessageId(0, 0); this.nack = nack; } --- 23,28 ---- public AckDatagram(boolean nack) { ! super(DatagramFactory.DGRAM_ACK, 0); ! this.msgId = null; this.nack = nack; } *************** *** 33,41 **** * @param nack true if NACK, false otherwise. */ ! public AckDatagram(IMessageDatagram.MessageId msgId, boolean nack) { ! super(DatagramFactory.DGRAM_ACK, 0); ! this.msgId = msgId; ! this.nack = nack; } --- 33,41 ---- * @param nack true if NACK, false otherwise. */ ! public AckDatagram(MessageId msgId, boolean nack) { ! super(DatagramFactory.DGRAM_ACK, 0); ! this.msgId = msgId; ! this.nack = nack; } *************** *** 45,93 **** public AckDatagram() { ! super(DatagramFactory.DGRAM_ACK, 0); } // ack-specific ! public IMessageDatagram.MessageId getAckMessageId() {return msgId;} public boolean isNegativeAck() {return nack;} /** ! * the ACK datagram format is: ! * * <pre> ! * ------- ! * msgId 12 (sender ID followed by sequence) ! * ------- ! * nack? 1 (non-zero for true). ! * ------- * </pre> ! */ public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException { ! super.incoming(bb); ! setDatagramProps(DatagramFactory.DGRAM_ACK, 0); ! ! msgId = new IMessageDatagram.MessageId(bb.getLong(), bb.getInt()); ! nack = (bb.get() != 0); } /** ! * allows the datagram to be output to a channel. ! */ public void outgoing(java.nio.ByteBuffer bb) { ! super.outgoing(bb); ! ! bb.putLong(msgId.getSenderId()); ! bb.putInt(msgId.getSequence()); ! bb.put(nack ? (byte)0x1 : (byte)0x0); } public String toString() { ! return super.toString() + ! "\nMsgID:\t\t" + msgId + ! "\nnack:\t\t" + nack; } } --- 45,99 ---- public AckDatagram() { ! super(DatagramFactory.DGRAM_ACK, 0); } // ack-specific ! public MessageId getAckMessageId() {return msgId;} public boolean isNegativeAck() {return nack;} /** ! * the ACK datagram format is: ! * * <pre> ! * ------- ! * nack? 1 non-zero for true ! * ------- ! * msgId 12 optional, sender ID followed by sequence ! * ------- * </pre> ! */ public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException { ! super.incoming(bb); ! setDatagramProps(DatagramFactory.DGRAM_ACK, 0); ! ! nack = (bb.get() != 0); ! if (bb.remaining() > 0) { ! msgId = new MessageId(bb.getLong(), bb.getInt()); ! } else { ! msgId = null; ! } } /** ! * allows the datagram to be output to a channel. ! */ public void outgoing(java.nio.ByteBuffer bb) { ! super.outgoing(bb); ! ! bb.put(nack ? (byte)0x1 : (byte)0x0); ! if (msgId != null) { ! bb.putLong(msgId.getSenderId()); ! bb.putInt(msgId.getSequence()); ! } } public String toString() { ! return super.toString() + ! "\nMsgID:\t\t" + msgId + ! "\nnack:\t\t" + nack; } } Index: MessageDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl/MessageDatagram.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** MessageDatagram.java 19 Sep 2002 03:57:38 -0000 1.9 --- MessageDatagram.java 19 Sep 2002 21:23:33 -0000 1.10 *************** *** 1,317 **** ! package com.ubermq.jms.server.datagram.impl; ! ! import com.ubermq.kernel.*; ! import java.util.*; ! ! import com.ubermq.jms.server.ServerConfig; ! import com.ubermq.jms.server.datagram.IMessageDatagram; ! import java.nio.ByteBuffer; ! ! /** ! * A complete implementation of the IMessageDatagram interface. This implementation ! * fully supports standard and customized properties, and efficiently stores some ! * of the standard properties in a compact bit field so parsing the properties ! * is not required to retrieve them. ! * <P> ! * This is a very important class, and subclassing it to remove extraneous functionality ! * may be a good way to boost performance and transport overhead. ! */ ! public class MessageDatagram ! extends AbstractDatagram ! implements IMessageDatagram ! { ! private String topicName; ! private long senderId; ! private int seq; ! ! private long originalSenderId; ! private int originalSeq; ! ! private HashMap props = null; ! private ByteBuffer lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! private boolean propsChanged = false; ! ! private static final int FIXED_SIZE = 4; ! private static final int MAX_PROP = Integer.valueOf( ! Configurator.getProperty(ServerConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue(); ! ! // masks and bit shifts for some STDPROP's that are stored in the ! // class-specific section of the datagram's flags. ! private static final int TTL_SHIFT = 16; ! private static final int TTL_MASK = 0xF; // TTL is stored as n x 100 ms, meaning we can TTL up to 1500 ms ! private static final int PRI_SHIFT = 20; ! private static final int PRI_MASK = 0xF; // PRI is stored in 4 bits, giving us 16 priority levels ! private static final int REDELIVERED_SHIFT = 24; ! private static final int REDELIVERED_MASK = 0x1; // a single bit ! private static final int MSGTYPE_SHIFT = 25; ! private static final int MSGTYPE_MASK = 0x7; // MSGTYPE gets 3 bits, that's 8 values, 0-7. use them well. ! ! /** ! * Constructs a message datagram intended for the given topic. ! * @param topicName a topic ! */ ! public MessageDatagram(String topicName) ! { ! super(DatagramFactory.DGRAM_MSG, 0); ! this.topicName = topicName; ! lazyProps.clear(); ! lazyProps.flip(); ! } ! ! /** ! * Constructs a message datagram for reading from a buffer, or ! * for sending to an undetermined topic. ! */ ! public MessageDatagram() ! { ! super(DatagramFactory.DGRAM_MSG, 0); ! lazyProps.clear(); ! lazyProps.flip(); ! } ! ! public String getTopicName() {return topicName;} ! public void setTopicName(String sz) {topicName = sz;} ! ! /** ! * Gets the original unique identifier for this message ! * when it arrived at the subscriber. This is used for acknowledgement ! * and identification. ! * @return the original message ID, constant for the life of the datagram. ! */ ! public MessageId getIncomingMessageId() ! { ! return new MessageId(originalSenderId, originalSeq); ! } ! ! public MessageId getMessageId() ! { ! return new MessageId(senderId, seq); ! } ! ! public long getSenderId() {return senderId;} ! public void setSenderId(long senderId) {this.senderId = senderId;} ! public int getSequence() {return seq;} ! public void setSequence(int seq) {this.seq = seq;} ! ! // get/setting std properties ! public void clearProperties() ! { ! getProps().clear(); ! propsChanged = true; ! } ! ! public void setStandardProperty(int property, Object value) ! { ! switch(property) ! { ! case STDPROP_PRIORITY: ! setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_MSGTYPE: ! setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_TTL: ! setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number)value).intValue() / 100.0)); ! break; ! case STDPROP_REDELIVERY: ! setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean)value).booleanValue() ? 1 : 0); ! break; ! default: ! getProps().put(new Integer(property), value); ! propsChanged = true; ! break; ! } ! } ! ! /** ! * In this implementation, some of the standard properties are stored ! * in the datagram flags for speed, so we don't have to deserialize the ! * properties. ! * <P> ! * I dare you to figure out which are stored this way. :) ! */ ! public Object getStandardProperty(int property) ! { ! switch(property) ! { ! case STDPROP_PRIORITY: ! return new Integer(getFlagBasedProperty(PRI_SHIFT, PRI_MASK)); ! case STDPROP_MSGTYPE: ! return new Integer(getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK)); ! case STDPROP_TTL: ! return new Integer(getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100); ! case STDPROP_REDELIVERY: ! return new Boolean(getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false); ! default: ! return getProps().get(new Integer(property)); ! } ! } ! ! private void setFlagBasedProperty(int shift, int mask, int value) ! { ! unsetDatagramFlagBits(mask << shift); ! setDatagramFlagBits((value & mask) << shift); ! } ! ! private int getFlagBasedProperty(int shift, int mask) ! { ! return (getDatagramFlags() >> shift) & mask; ! } ! ! public void setCustomProperty(String property, Object value) ! { ! getProps().put(property, value); ! propsChanged = true; ! } ! ! public Object getCustomProperty(String property) ! { ! return getProps().get(property); ! } ! ! public Collection getCustomPropertyNames() ! { ! return getProps().keySet(); ! } ! ! public void prepareToSend(int deliveryMode, ! int priority, ! long ttl) ! { ! setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer(deliveryMode)); ! setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer(priority)); ! setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long(ttl)); ! setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long(System.currentTimeMillis())); ! } ! ! ! public synchronized HashMap getProps() ! { ! if (props != null) ! return props; ! else ! { ! try { ! parseProps(); ! } catch(NullPointerException npe) ! { ! com.ubermq.Utility.getLogger().severe("NPE: " + lazyProps.limit() + " " + lazyProps.position()); ! } ! return props; ! } ! } ! ! private void parseProps() ! { ! props = new HashMap(); ! ! while(lazyProps.remaining() > 0) ! { ! boolean std = (lazyProps.get() != 0); ! if (std) ! { ! int prop = lazyProps.getInt(); ! Object value = readTypedData(lazyProps); ! ! props.put(new Integer(prop), value); ! } ! else ! { ! String prop = readPascalString(lazyProps); ! Object value = readTypedData(lazyProps); ! ! props.put(prop, value); ! } ! } ! ! lazyProps.rewind(); ! } ! ! private void rebuildProps() ! { ! lazyProps.clear(); ! for(Iterator i = props.keySet().iterator();i.hasNext();) ! { ! Object key = i.next(); ! Object value = props.get(key); ! ! if (key instanceof Integer) ! { ! lazyProps.put((byte)0x1); ! lazyProps.putInt( ((Integer)key).intValue() ); ! writeTypedData(value, lazyProps); ! } ! else ! { ! lazyProps.put((byte)0x0); ! writePascalString(key.toString(), lazyProps); ! writeTypedData(value, lazyProps); ! } ! } ! ! lazyProps.flip(); ! propsChanged = false; ! } ! ! /** ! * the MSG datagram format is: ! * ! * <PRE> ! * ------ ! * senderId LONG ! * ------ ! * seq INT ! * ------ ! * topic-name pascal string ! * ------ ! * props n ! * ------ ! * </PRE> ! */ ! public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException ! { ! super.incoming(bb); ! originalSenderId = senderId = bb.getLong(); ! originalSeq = seq = bb.getInt(); ! topicName = readPascalString(bb); ! ! // read in props array ! props = null; ! propsChanged = false; ! ! lazyProps.clear(); ! if (bb.hasRemaining()) lazyProps.put(bb); ! lazyProps.flip(); ! } ! ! /** ! * allows the datagram to be output to a channel. ! */ ! public void outgoing(java.nio.ByteBuffer bb) ! { ! super.outgoing(bb); ! bb.putLong(senderId); ! bb.putInt(seq); ! writePascalString(topicName, bb); ! ! // if the props have not been changed, write them straight out. ! if (propsChanged) ! rebuildProps(); ! ! lazyProps.rewind(); ! if (lazyProps.hasRemaining()) bb.put(lazyProps); ! } ! ! public String toString() ! { ! return super.toString() + ! "\nMsgID:\t\t" + getMessageId() + ! "\nSendr:\t\t" + getSenderId() + ! "\nSeq :\t\t" + getSequence() + ! "\nTopic:\t\t" + getTopicName() + ! "\nProps:\t\t" + getProps().toString(); ! } ! } ! ! --- 1,370 ---- ! package com.ubermq.jms.server.datagram.impl; ! ! import com.ubermq.jms.server.datagram.*; ! import com.ubermq.kernel.*; ! import java.util.*; ! ! import com.ubermq.jms.server.ServerConfig; ! import java.nio.ByteBuffer; ! ! /** ! * A complete implementation of the IMessageDatagram interface. This implementation ! * fully supports standard and customized properties, and efficiently stores some ! * of the standard properties in a compact bit field so parsing the properties ! * is not required to retrieve them. ! * <P> ! * This is a very important class, and subclassing it to remove extraneous functionality ! * may be a good way to boost performance and transport overhead. ! */ ! public class MessageDatagram ! extends AbstractDatagram ! implements IMessageDatagram ! { ! private String topicName; ! ! // sender ID and sequence ! private long senderId; ! private int seq; ! ! // original sender ID and sequence (retain original) ! private long originalSenderId; ! private int originalSeq; ! ! // some properties go with every message. ! private long various; // this is a bitfield, defined below. ! private long timestamp; ! private byte[] body; // the body goes here. it does not make a copy! ! ! // all custom and standard properties not strictly required. ! private HashMap props = null; ! ! private static final int MAX_PROP = Integer.valueOf( ! Configurator.getProperty(ServerConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue(); ! ! private ByteBuffer lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! private boolean propsChanged = false; ! ! // masks and bit shifts for some STDPROP's that are stored in the ! // class-specific section of the datagram's flags. ! private static final int TTL_SHIFT = 0; ! private static final int TTL_MASK = 0xFF; // TTL is stored as n x 100 ms, meaning we can TTL up to 25500 ms ! private static final int PRI_SHIFT = 16; ! private static final int PRI_MASK = 0xF; // PRI is stored in 4 bits, giving us 16 priority levels ! private static final int REDELIVERED_SHIFT = 20; ! private static final int REDELIVERED_MASK = 0x1; // a single bit ! private static final int MSGTYPE_SHIFT = 21; ! private static final int MSGTYPE_MASK = 0x7; // MSGTYPE gets 3 bits, that's 8 values, 0-7. use them well. ! private static final int DELIVERY_SHIFT = 24; ! private static final int DELIVERY_MASK = 0x7; // DELIVERYMODE gets 3 bits, that's 8 values, 0-7. use them well. ! ! /** ! * Constructs a message datagram intended for the given topic. ! * @param topicName a topic ! */ ! public MessageDatagram(String topicName) ! { ! super(DatagramFactory.DGRAM_MSG, 0); ! this.topicName = topicName; ! lazyProps.clear(); ! lazyProps.flip(); ! } ! ! /** ! * Constructs a message datagram for reading from a buffer, or ! * for sending to an undetermined topic. ! */ ! public MessageDatagram() ! { ! super(DatagramFactory.DGRAM_MSG, 0); ! lazyProps.clear(); ! lazyProps.flip(); ! } ! ! public String getTopicName() {return topicName;} ! public void setTopicName(String sz) {topicName = sz;} ! ! /** ! * Gets the original unique identifier for this message ! * when it arrived at the subscriber. This is used for acknowledgement ! * and identification. ! * @return the original message ID, constant for the life of the datagram. ! */ ! public MessageId getIncomingMessageId() ! { ! return new MessageId(originalSenderId, originalSeq); ! } ! ! public MessageId getMessageId() ! { ! return new MessageId(senderId, seq); ! } ! ! public long getSenderId() {return senderId;} ! public void setSenderId(long senderId) {this.senderId = senderId;} ! public int getSequence() {return seq;} ! public void setSequence(int seq) {this.seq = seq;} ! ! // get/setting std properties ! public void clearProperties() ! { ! getProps().clear(); ! propsChanged = true; ! } ! ! public void setStandardProperty(int property, Object value) ! { ! switch(property) ! { ! case STDPROP_PRIORITY: ! setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_MSGTYPE: ! setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_TTL: ! setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number)value).intValue() / 100.0)); ! break; ! case STDPROP_REDELIVERY: ! setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean)value).booleanValue() ? 1 : 0); ! break; ! case STDPROP_TIMESTAMP: ! timestamp = ((Number)value).longValue(); ! break; ! case STDPROP_BODY: ! body = (byte[])value; ! break; ! case STDPROP_DELIVERYMODE: ! setFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK, ((Number)value).intValue()); ! break; ! default: ! getProps().put(new Integer(property), value); ! propsChanged = true; ! break; ! } ! } ! ! /** ! * In this implementation, some of the standard properties are stored ! * in the datagram flags for speed, so we don't have to deserialize the ! * properties. ! * <P> ! * I dare you to figure out which are stored this way. :) ! */ ! public Object getStandardProperty(int property) ! { ! switch(property) ! { ! case STDPROP_PRIORITY: ! return new Integer(getFlagBasedProperty(PRI_SHIFT, PRI_MASK)); ! case STDPROP_MSGTYPE: ! return new Integer(getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK)); ! case STDPROP_TTL: ! return new Integer(getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100); ! case STDPROP_REDELIVERY: ! return new Boolean(getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false); ! case STDPROP_TIMESTAMP: ! return new Long(timestamp); ! case STDPROP_BODY: ! return body; ! case STDPROP_DELIVERYMODE: ! return new Integer(getFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK)); ! default: ! return getProps().get(new Integer(property)); ! } ! } ! ! private void setFlagBasedProperty(int shift, int mask, int value) ! { ! various &= ~(mask << shift); ! various |= (value & mask) << shift; ! } ! ! private int getFlagBasedProperty(int shift, int mask) ! { ! return (int)((various >> shift) & mask); ! } ! ! public void setCustomProperty(String property, Object value) ! { ! getProps().put(property, value); ! propsChanged = true; ! } ! ! public Object getCustomProperty(String property) ! { ! return getProps().get(property); ! } ! ! public Collection getCustomPropertyNames() ! { ! return getProps().keySet(); ! } ! ! public void prepareToSend(int deliveryMode, ! int priority, ! long ttl) ! { ! setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer(deliveryMode)); ! setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer(priority)); ! setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long(ttl)); ! setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long(System.currentTimeMillis())); ! } ! ! ! public synchronized HashMap getProps() ! { ! if (props != null) ! return props; ! else ! { ! try { ! parseProps(); ! } catch(NullPointerException npe) ! { ! com.ubermq.Utility.getLogger().severe("NPE: " + lazyProps.limit() + " " + lazyProps.position()); ! } ! return props; ! } ! } ! ! private void parseProps() ! { ! props = new HashMap(); ! ! while(lazyProps.remaining() > 0) ! { ! boolean std = (lazyProps.get() != 0); ! if (std) ! { ! int prop = lazyProps.getInt(); ! Object value = readTypedData(lazyProps); ! ! props.put(new Integer(prop), value); ! } ! else ! { ! String prop = readPascalString(lazyProps); ! Object value = readTypedData(lazyProps); ! ! props.put(prop, value); ! } ! } ! ! lazyProps.rewind(); ! } ! ! private void rebuildProps() ! { ! lazyProps.clear(); ! for(Iterator i = props.keySet().iterator();i.hasNext();) ! { ! Object key = i.next(); ! Object value = props.get(key); ! ! if (key instanceof Integer) ! { ! lazyProps.put((byte)0x1); ! lazyProps.putInt( ((Integer)key).intValue() ); ! writeTypedData(value, lazyProps); ! } ! else ! { ! lazyProps.put((byte)0x0); ! writePascalString(key.toString(), lazyProps); ! writeTypedData(value, lazyProps); ! } ! } ! ! lazyProps.flip(); ! propsChanged = false; ! } ! ! /** ! * the MSG datagram format is: ! * ! * <PRE> ! * ------ ! * senderId LONG ! * ------ ! * seq INT ! * ------ ! * various LONG ! * ------ ! * timestamp LONG ! * ------ ! * props-length INT ! * ------ ! * topic-name pascal string ! * ------ ! * props props-length ! * ------ ! * body n ! * ------ ! * </PRE> ! */ ! public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException ! { ! super.incoming(bb); ! originalSenderId = senderId = bb.getLong(); ! originalSeq = seq = bb.getInt(); ! various = bb.getLong(); ! timestamp = bb.getLong(); ! int nprops = bb.getInt(); ! topicName = readPascalString(bb); ! ! // read in props array ! props = null; ! propsChanged = false; ! ! ByteBuffer props = bb.slice(); ! props.limit(nprops); ! lazyProps.clear(); ! if (bb.hasRemaining()) lazyProps.put(props); ! lazyProps.flip(); ! ! // read in body ! bb.position(bb.position() + nprops); ! body = new byte[bb.remaining()]; ! bb.get(body); ! } ! ! /** ! * allows the datagram to be output to a channel. ! */ ! public void outgoing(java.nio.ByteBuffer bb) ! { ! // if the props have changed, build them. ! if (propsChanged) ! rebuildProps(); ! ! // now output ! super.outgoing(bb); ! bb.putLong(senderId); ! bb.putInt(seq); ! bb.putLong(various); ! bb.putLong(timestamp); ! bb.putInt(lazyProps.limit()); ! writePascalString(topicName, bb); ! ! // output props ! lazyProps.rewind(); ! if (lazyProps.hasRemaining()) bb.put(lazyProps); ! ! // output body ! if (body != null) ! bb.put(body); ! } ! ! public String toString() ! { ! return super.toString() + ! "\nMsgID:\t\t" + getMessageId() + ! "\nSendr:\t\t" + getSenderId() + ! "\nSeq :\t\t" + getSequence() + ! "\nTopic:\t\t" + getTopicName() + ! "\nProps:\t\t" + getProps().toString(); ! } ! } ! ! |