[Ubermq-commits] jms/src/com/ubermq/jms/server/datagram/impl MessageDatagram.java,1.14,1.15
Brought to you by:
jimmyp
From: <ji...@us...> - 2002-10-11 17:44:36
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl In directory usw-pr-cvs1:/tmp/cvs-serv16797/src/com/ubermq/jms/server/datagram/impl Modified Files: MessageDatagram.java Log Message: prototype of Uber message viewer. very cool little app to see what is going on inside your infrastructure. Index: MessageDatagram.java =================================================================== RCS file: /cvsroot/ubermq/jms/src/com/ubermq/jms/server/datagram/impl/MessageDatagram.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** MessageDatagram.java 9 Oct 2002 21:14:23 -0000 1.14 --- MessageDatagram.java 11 Oct 2002 17:44:31 -0000 1.15 *************** *** 22,48 **** { private String topicName; ! ! // sender ID and sequence private long senderId; private int seq; ! ! // original sender ID and sequence (retain original) ! private long originalSenderId; ! private int originalSeq; ! ! // some properties go with every message. ! private long various; // this is a bitfield, defined below. ! private long timestamp; ! private byte[] body; // the body goes here. it does not make a copy! ! ! // all custom and standard properties not strictly required. private HashMap props = null; ! private static final int MAX_PROP = Integer.valueOf( ! Configurator.getProperty(ServerConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue(); ! private ByteBuffer lazyProps; private boolean propsChanged = false; ! // masks and bit shifts for some STDPROP's that are stored in the // class-specific section of the datagram's flags. --- 22,48 ---- { private String topicName; ! ! // sender ID and sequence private long senderId; private int seq; ! ! // original sender ID and sequence (retain original) ! private long originalSenderId; ! private int originalSeq; ! ! // some properties go with every message. ! private long various; // this is a bitfield, defined below. ! private long timestamp; ! private Object body; // the body goes here. it does not make a copy! ! ! // all custom and standard properties not strictly required. private HashMap props = null; ! private static final int MAX_PROP = Integer.valueOf( ! Configurator.getProperty(ServerConfig.DGP_MAXIMUM_PROPERTY_LENGTH, "4096")).intValue(); ! private ByteBuffer lazyProps; private boolean propsChanged = false; ! // masks and bit shifts for some STDPROP's that are stored in the // class-specific section of the datagram's flags. *************** *** 57,405 **** static final int DELIVERY_SHIFT = 24; static final int DELIVERY_MASK = 0x7; // DELIVERYMODE gets 3 bits, that's 8 values, 0-7. use them well. ! ! /** ! * Constructs a message datagram intended for the given topic. ! * @param topicName a topic ! */ MessageDatagram(String topicName) { ! super(DatagramFactory.DGRAM_MSG, 0); ! this.topicName = topicName; ! lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! lazyProps.clear(); ! lazyProps.flip(); } ! ! /** ! * Constructs a message datagram for reading from a buffer, or ! * for sending to an undetermined topic. ! */ public MessageDatagram() { ! super(DatagramFactory.DGRAM_MSG, 0); ! lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! lazyProps.clear(); ! lazyProps.flip(); } ! public String getTopicName() {return topicName;} public void setTopicName(String sz) {topicName = sz;} ! ! /** ! * Gets the original unique identifier for this message ! * when it arrived at the subscriber. This is used for acknowledgement ! * and identification. ! * @return the original message ID, constant for the life of the datagram. ! */ ! public MessageId getIncomingMessageId() ! { ! return new MessageId(originalSenderId, originalSeq); ! } ! public MessageId getMessageId() { ! return new MessageId(senderId, seq); } ! public long getSenderId() {return senderId;} public void setSenderId(long senderId) {this.senderId = senderId;} public int getSequence() {return seq;} public void setSequence(int seq) {this.seq = seq;} ! // get/setting std properties public void clearProperties() { ! getProps().clear(); ! propsChanged = true; } ! public void setStandardProperty(int property, Object value) { ! switch(property) ! { ! case STDPROP_PRIORITY: ! setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_MSGTYPE: ! setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_TTL: ! setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number)value).intValue() / 100.0)); ! break; ! case STDPROP_REDELIVERY: ! setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean)value).booleanValue() ? 1 : 0); ! break; ! case STDPROP_TIMESTAMP: ! timestamp = ((Number)value).longValue(); ! break; ! case STDPROP_BODY: ! body = (byte[])value; ! break; ! case STDPROP_DELIVERYMODE: ! setFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK, ((Number)value).intValue()); ! break; ! default: ! getProps().put(new Integer(property), value); ! propsChanged = true; ! break; ! } } ! /** ! * In this implementation, some of the standard properties are stored ! * in the datagram flags for speed, so we don't have to deserialize the ! * properties. ! * <P> ! * I dare you to figure out which are stored this way. :) ! */ public Object getStandardProperty(int property) { ! switch(property) ! { ! case STDPROP_PRIORITY: ! return new Integer(getFlagBasedProperty(PRI_SHIFT, PRI_MASK)); ! case STDPROP_MSGTYPE: ! return new Integer(getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK)); ! case STDPROP_TTL: ! return new Integer(getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100); ! case STDPROP_REDELIVERY: ! return new Boolean(getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false); ! case STDPROP_TIMESTAMP: ! return new Long(timestamp); ! case STDPROP_BODY: ! return body; ! case STDPROP_DELIVERYMODE: ! return new Integer(getFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK)); ! default: ! return getProps().get(new Integer(property)); ! } } ! private void setFlagBasedProperty(int shift, int mask, int value) { ! various = setFlagBasedProperty(various, shift, mask, value); } ! ! static long setFlagBasedProperty(long v, int shift, int mask, int value) ! { ! v &= ~(mask << shift); ! v |= (value & mask) << shift; ! return v; ! } ! ! private int getFlagBasedProperty(int shift, int mask) ! { ! return getFlagBasedProperty(various, shift, mask); ! } ! static int getFlagBasedProperty(long v, int shift, int mask) { ! return (int)((v >> shift) & mask); } ! public void setCustomProperty(String property, Object value) { ! getProps().put(property, value); ! propsChanged = true; } ! public Object getCustomProperty(String property) { ! return getProps().get(property); } ! public Collection getCustomPropertyNames() { ! return getProps().keySet(); } ! public void prepareToSend(int deliveryMode, ! int priority, ! long ttl) { ! setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer(deliveryMode)); ! setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer(priority)); ! setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long(ttl)); ! setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long(System.currentTimeMillis())); } ! ! public synchronized HashMap getProps() { ! if (props != null) ! return props; ! else ! { ! try { ! parseProps(); ! } catch(NullPointerException npe) ! { ! com.ubermq.Utility.getLogger().severe("NPE: " + lazyProps.limit() + " " + lazyProps.position()); ! } ! return props; ! } } ! private void parseProps() { ! props = new HashMap(); ! parsePropsBuffer(lazyProps, props); ! lazyProps.rewind(); ! } ! ! /** ! * Parses a properties buffer into a map of properties contained inside. ! * This is exposed to other implementations to leverage the same type of ! * functionality. ! */ ! static void parsePropsBuffer(ByteBuffer lazyProps, ! Map props) ! { ! while(lazyProps.remaining() > 0) ! { ! boolean std = (lazyProps.get() != 0); ! if (std) ! { ! int prop = lazyProps.getInt(); ! Object value = readTypedData(lazyProps); ! ! props.put(new Integer(prop), value); ! } ! else ! { ! String prop = readPascalString(lazyProps); ! Object value = readTypedData(lazyProps); ! ! props.put(prop, value); ! } ! } } ! private void rebuildProps() { ! lazyProps.clear(); ! for(Iterator i = props.keySet().iterator();i.hasNext();) ! { ! Object key = i.next(); ! Object value = props.get(key); ! ! if (key instanceof Integer) ! { ! lazyProps.put((byte)0x1); ! lazyProps.putInt( ((Integer)key).intValue() ); ! writeTypedData(value, lazyProps); ! } ! else ! { ! lazyProps.put((byte)0x0); ! writePascalString(key.toString(), lazyProps); ! writeTypedData(value, lazyProps); ! } ! } ! ! lazyProps.flip(); ! propsChanged = false; } ! /** ! * the MSG datagram format is: ! * ! * <PRE> ! * ------ ! * senderId LONG ! * ------ ! * seq INT ! * ------ ! * various LONG ! * ------ ! * timestamp LONG ! * ------ ! * props-length INT ! * ------ ! * topic-name pascal string ! * ------ ! * props props-length ! * ------ ! * body n ! * ------ ! * </PRE> ! */ public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException { ! super.incoming(bb); ! originalSenderId = senderId = bb.getLong(); ! originalSeq = seq = bb.getInt(); ! various = bb.getLong(); ! timestamp = bb.getLong(); ! int nprops = bb.getInt(); ! topicName = readPascalString(bb); ! ! // read in props array ! props = null; ! propsChanged = false; ! ! ByteBuffer props = bb.slice(); ! props.limit(nprops); ! lazyProps.clear(); ! if (bb.hasRemaining()) lazyProps.put(props); ! lazyProps.flip(); ! ! // read in body ! bb.position(bb.position() + nprops); ! body = new byte[bb.remaining()]; ! bb.get(body); } ! /** ! * allows the datagram to be output to a channel. ! */ public void outgoing(java.nio.ByteBuffer bb) { ! // if the props have changed, build them. ! if (propsChanged) ! rebuildProps(); ! ! // now output ! super.outgoing(bb); ! bb.putLong(senderId); ! bb.putInt(seq); ! bb.putLong(various); ! bb.putLong(timestamp); ! bb.putInt(lazyProps.limit()); ! writePascalString(topicName, bb); ! ! // output props ! lazyProps.rewind(); ! if (lazyProps.hasRemaining()) bb.put(lazyProps); ! ! // output body ! if (body != null) ! bb.put(body); } ! public String toString() { ! return super.toString() + ! "\nMsgID:\t\t" + getMessageId() + ! "\nSendr:\t\t" + getSenderId() + ! "\nSeq :\t\t" + getSequence() + ! "\nTopic:\t\t" + getTopicName() + ! "\nProps:\t\t" + getProps().toString(); } ! ! public boolean equals(Object o) ! { ! if (o instanceof IMessageDatagram) ! { ! return getMessageId().equals(((IMessageDatagram)o).getMessageId()); ! } ! else ! { ! return false; ! } ! } ! ! public int hashCode() {return getMessageId().hashCode();} } --- 57,414 ---- static final int DELIVERY_SHIFT = 24; static final int DELIVERY_MASK = 0x7; // DELIVERYMODE gets 3 bits, that's 8 values, 0-7. use them well. ! ! /** ! * Constructs a message datagram intended for the given topic. ! * @param topicName a topic ! */ MessageDatagram(String topicName) { ! super(DatagramFactory.DGRAM_MSG, 0); ! this.topicName = topicName; ! lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! lazyProps.clear(); ! lazyProps.flip(); } ! ! /** ! * Constructs a message datagram for reading from a buffer, or ! * for sending to an undetermined topic. ! */ public MessageDatagram() { ! super(DatagramFactory.DGRAM_MSG, 0); ! lazyProps = ByteBuffer.allocateDirect(MAX_PROP); ! lazyProps.clear(); ! lazyProps.flip(); } ! public String getTopicName() {return topicName;} public void setTopicName(String sz) {topicName = sz;} ! ! /** ! * Gets the original unique identifier for this message ! * when it arrived at the subscriber. This is used for acknowledgement ! * and identification. ! * @return the original message ID, constant for the life of the datagram. ! */ ! public MessageId getIncomingMessageId() ! { ! return new MessageId(originalSenderId, originalSeq); ! } ! public MessageId getMessageId() { ! return new MessageId(senderId, seq); } ! public long getSenderId() {return senderId;} public void setSenderId(long senderId) {this.senderId = senderId;} public int getSequence() {return seq;} public void setSequence(int seq) {this.seq = seq;} ! // get/setting std properties public void clearProperties() { ! getProps().clear(); ! propsChanged = true; } ! public void setStandardProperty(int property, Object value) { ! switch(property) ! { ! case STDPROP_PRIORITY: ! setFlagBasedProperty(PRI_SHIFT, PRI_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_MSGTYPE: ! setFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK, ((Number)value).intValue()); ! break; ! case STDPROP_TTL: ! setFlagBasedProperty(TTL_SHIFT, TTL_MASK, (int)Math.ceil(((Number)value).intValue() / 100.0)); ! break; ! case STDPROP_REDELIVERY: ! setFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK, ((Boolean)value).booleanValue() ? 1 : 0); ! break; ! case STDPROP_TIMESTAMP: ! timestamp = ((Number)value).longValue(); ! break; ! case STDPROP_BODY: ! body = value; ! break; ! case STDPROP_DELIVERYMODE: ! setFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK, ((Number)value).intValue()); ! break; ! default: ! getProps().put(new Integer(property), value); ! propsChanged = true; ! break; ! } } ! /** ! * In this implementation, some of the standard properties are stored ! * in the datagram flags for speed, so we don't have to deserialize the ! * properties. ! * <P> ! * I dare you to figure out which are stored this way. :) ! */ public Object getStandardProperty(int property) { ! switch(property) ! { ! case STDPROP_PRIORITY: ! return new Integer(getFlagBasedProperty(PRI_SHIFT, PRI_MASK)); ! case STDPROP_MSGTYPE: ! return new Integer(getFlagBasedProperty(MSGTYPE_SHIFT, MSGTYPE_MASK)); ! case STDPROP_TTL: ! return new Integer(getFlagBasedProperty(TTL_SHIFT, TTL_MASK) * 100); ! case STDPROP_REDELIVERY: ! return new Boolean(getFlagBasedProperty(REDELIVERED_SHIFT, REDELIVERED_MASK) != 0 ? true : false); ! case STDPROP_TIMESTAMP: ! return new Long(timestamp); ! case STDPROP_BODY: ! return body; ! case STDPROP_DELIVERYMODE: ! return new Integer(getFlagBasedProperty(DELIVERY_SHIFT, DELIVERY_MASK)); ! default: ! return getProps().get(new Integer(property)); ! } } ! private void setFlagBasedProperty(int shift, int mask, int value) { ! various = setFlagBasedProperty(various, shift, mask, value); } ! ! static long setFlagBasedProperty(long v, int shift, int mask, int value) ! { ! v &= ~(mask << shift); ! v |= (value & mask) << shift; ! return v; ! } ! ! private int getFlagBasedProperty(int shift, int mask) ! { ! return getFlagBasedProperty(various, shift, mask); ! } ! static int getFlagBasedProperty(long v, int shift, int mask) { ! return (int)((v >> shift) & mask); } ! public void setCustomProperty(String property, Object value) { ! getProps().put(property, value); ! propsChanged = true; } ! public Object getCustomProperty(String property) { ! return getProps().get(property); } ! public Collection getCustomPropertyNames() { ! List customNames = new LinkedList(); ! ! Iterator iter = getProps().keySet().iterator(); ! while (iter.hasNext()) ! { ! Object element = iter.next(); ! if (element instanceof String) ! customNames.add(element); ! } ! ! return customNames; } ! public void prepareToSend(int deliveryMode, ! int priority, ! long ttl) { ! setStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE, new Integer(deliveryMode)); ! setStandardProperty(IMessageDatagram.STDPROP_PRIORITY, new Integer(priority)); ! setStandardProperty(IMessageDatagram.STDPROP_TTL, new Long(ttl)); ! setStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP, new Long(System.currentTimeMillis())); } ! ! public synchronized HashMap getProps() { ! if (props != null) ! return props; ! else ! { ! try { ! parseProps(); ! } catch(NullPointerException npe) ! { ! com.ubermq.Utility.getLogger().severe("NPE: " + lazyProps.limit() + " " + lazyProps.position()); ! } ! return props; ! } } ! private void parseProps() { ! props = new HashMap(); ! parsePropsBuffer(lazyProps, props); ! lazyProps.rewind(); } ! ! /** ! * Parses a properties buffer into a map of properties contained inside. ! * This is exposed to other implementations to leverage the same type of ! * functionality. ! */ ! static void parsePropsBuffer(ByteBuffer lazyProps, ! Map props) ! { ! while(lazyProps.remaining() > 0) ! { ! boolean std = (lazyProps.get() != 0); ! if (std) ! { ! int prop = lazyProps.getInt(); ! Object value = readTypedData(lazyProps); ! ! props.put(new Integer(prop), value); ! } ! else ! { ! String prop = readPascalString(lazyProps); ! Object value = readTypedData(lazyProps); ! ! props.put(prop, value); ! } ! } ! } ! private void rebuildProps() { ! lazyProps.clear(); ! for(Iterator i = props.keySet().iterator();i.hasNext();) ! { ! Object key = i.next(); ! Object value = props.get(key); ! ! if (key instanceof Integer) ! { ! lazyProps.put((byte)0x1); ! lazyProps.putInt( ((Integer)key).intValue() ); ! writeTypedData(value, lazyProps); ! } ! else ! { ! lazyProps.put((byte)0x0); ! writePascalString(key.toString(), lazyProps); ! writeTypedData(value, lazyProps); ! } ! } ! ! lazyProps.flip(); ! propsChanged = false; } ! /** ! * the MSG datagram format is: ! * ! * <PRE> ! * ------ ! * senderId LONG ! * ------ ! * seq INT ! * ------ ! * various LONG ! * ------ ! * timestamp LONG ! * ------ ! * props-length INT ! * ------ ! * topic-name pascal string ! * ------ ! * props props-length ! * ------ ! * body n ! * ------ ! * </PRE> ! */ public void incoming(java.nio.ByteBuffer bb) ! throws java.io.IOException { ! super.incoming(bb); ! originalSenderId = senderId = bb.getLong(); ! originalSeq = seq = bb.getInt(); ! various = bb.getLong(); ! timestamp = bb.getLong(); ! int nprops = bb.getInt(); ! topicName = readPascalString(bb); ! ! // read in props array ! props = null; ! propsChanged = false; ! ! ByteBuffer props = bb.slice(); ! props.limit(nprops); ! lazyProps.clear(); ! if (bb.hasRemaining()) lazyProps.put(props); ! lazyProps.flip(); ! ! // read in body ! bb.position(bb.position() + nprops); ! body = readTypedData(bb); } ! /** ! * allows the datagram to be output to a channel. ! */ public void outgoing(java.nio.ByteBuffer bb) { ! // if the props have changed, build them. ! if (propsChanged) ! rebuildProps(); ! ! // now output ! super.outgoing(bb); ! bb.putLong(senderId); ! bb.putInt(seq); ! bb.putLong(various); ! bb.putLong(timestamp); ! bb.putInt(lazyProps.limit()); ! writePascalString(topicName, bb); ! ! // output props ! lazyProps.rewind(); ! if (lazyProps.hasRemaining()) bb.put(lazyProps); ! ! // output body ! if (body != null) ! writeTypedData(body, bb); } ! public String toString() { ! return super.toString() + ! "\nMsgID:\t\t" + getMessageId() + ! "\nSendr:\t\t" + getSenderId() + ! "\nSeq :\t\t" + getSequence() + ! "\nTopic:\t\t" + getTopicName() + ! "\nProps:\t\t" + getProps().toString(); } ! ! public boolean equals(Object o) ! { ! if (o instanceof IMessageDatagram) ! { ! return getMessageId().equals(((IMessageDatagram)o).getMessageId()); ! } ! else ! { ! return false; ! } ! } ! ! public int hashCode() {return getMessageId().hashCode();} } |