[Ubermq-commits] jms/src/com/ubermq/jms/common/routing/impl SimpleSelector.java,NONE,1.1 ConnectionD
Brought to you by:
jimmyp
From: <ji...@us...> - 2004-01-21 02:05:29
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common/routing/impl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common/routing/impl Added Files: SimpleSelector.java ConnectionDestNode.java StaticSourceSpec.java SelectorDestNode.java Router.java RegexpHelper.java RegexpSourceSpec.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: SimpleSelector.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.datagram.*; import com.ubermq.jms.common.routing.*; import java.util.regex.*; import javax.jms.*; /** * A VERY trivial implementation of message selectors. * we only support "WHERE property = scalar" */ public class SimpleSelector implements Selector { private String property, scalar; private int operator; private boolean valid; private static final int EQUALS = 0, GREATER = 1, LESS = 2, NOTEQUAL = 3; public static final String WHERE_REGEX = "where\\s*(\\S*)\\s*(=|>|<|!=|<>)\\s*'??([^']*)'??"; public SimpleSelector(String sz) throws InvalidSelectorException { // parse the string. Pattern p = Pattern.compile(WHERE_REGEX, Pattern.CASE_INSENSITIVE); Matcher m = p.matcher(sz); if (m.matches()) { property = m.group(1); scalar = m.group(3); String op = m.group(2); if (op.equals("=")) operator = EQUALS; else if (op.equals("<>") || op.equals("!=")) operator = NOTEQUAL; else if (op.equals(">")) operator = GREATER; else if (op.equals("<")) operator = LESS; else operator = EQUALS; if (scalar.equalsIgnoreCase("NULL")) scalar = null; valid = true; } else { throw new InvalidSelectorException(""); } } public boolean accept(IMessageDatagram msg) { Object value = null; // if we are not valid, return false for everything. if (!valid) return false; if (property.equals("JMSDeliveryMode")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_DELIVERYMODE); } else if (property.equals("JMSPriority")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_PRIORITY); } else if (property.equals("JMSMessageID")) { value = msg.getMessageId(); } else if (property.equals("JMSTimestamp")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_TIMESTAMP); } else if (property.equals("JMSCorrelationID")) { value = msg.getStandardProperty(IMessageDatagram.STDPROP_CORRELATIONID); } else { value = msg.getCustomProperty(property); } com.ubermq.util.Utility.getLogger().debug("selector comparing " + property + " containing " + value + " with " + scalar); if (value == null || scalar == null) { return (value == null && scalar == null && operator == EQUALS); } else { switch(operator) { case EQUALS: return (value.toString().equals(scalar)); case NOTEQUAL: return !(value.toString().equals(scalar)); default: return false; } } } public String toString() { return "Property: " + property + " operator: " + operator + " scalar: " + scalar; } } --- NEW FILE: ConnectionDestNode.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import com.ubermq.kernel.*; import java.io.*; /** * Implements a destination that contains information about a connected peer. * The message server will use this information from the router to forward * messages to their logical destinations. */ public class ConnectionDestNode implements RouteDestNode, DatagramSink, Comparable, java.io.Serializable { private IConnectionInfo conn; public ConnectionDestNode(IConnectionInfo ci) {this.conn = ci;} public final IConnectionInfo getConnection() {return conn;} public boolean isOpen() {return getConnection().isOpen();} public void output(IDatagram d, IOverflowHandler h) throws IOException { conn.output(d, h); } public String getDisplayName() {return conn.toString();} public String getNodeName() {return conn.getId();} public String toString() {return conn.toString();} public boolean equals(Object o) { if (o instanceof RouteDestNode) { return (getNodeName().equals(((RouteDestNode)o).getNodeName())); } else { return false; } } public int compareTo(Object obj) throws ClassCastException { RouteDestNode dn = (RouteDestNode)obj; return (getNodeName().compareTo(dn.getNodeName())); } public int hashCode() {return getNodeName().hashCode();} } --- NEW FILE: StaticSourceSpec.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; public class StaticSourceSpec implements SourceSpec, java.io.Serializable { private String expr; /** * Constructs a static source specifier, from * the given string expression. * @param expr the expression representing the source specifier. */ public StaticSourceSpec(String expr) { this.expr = expr; } public String getDisplayName() {return expr;} public String toString() {return expr;} /** * Strictly matches this static specifier and * another implementation of the interface. */ public boolean matches(SourceSpec s) { return expr.equals(s.toString()); } public boolean isMoreSpecificThan(SourceSpec s) { return false; } public int hashCode() { return expr.hashCode(); } public boolean equals(Object o) { if (o instanceof SourceSpec) { return this.toString().equals(o.toString()); } else return false; } public boolean shouldCacheResults() {return true;} } --- NEW FILE: SelectorDestNode.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.datagram.*; import com.ubermq.jms.common.routing.*; import com.ubermq.kernel.*; import java.io.*; public class SelectorDestNode extends ConnectionDestNode { private Selector selector; public SelectorDestNode(IConnectionInfo ci, String selector) throws javax.jms.InvalidSelectorException { super(ci); this.selector = new SimpleSelector(selector); com.ubermq.util.Utility.getLogger().debug("selector is " + this.selector.toString()); } /** * expose the output() method for the connection so we can * write to the endpoint. */ public void output(IDatagram d, IOverflowHandler h) throws IOException { if (d instanceof IMessageDatagram) { boolean accept = selector.accept((IMessageDatagram)d); com.ubermq.util.Utility.getLogger().debug("selector returned " + accept); if (!accept) return; } super.output(d, h); } } --- NEW FILE: Router.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import java.util.*; import java.io.*; /** * The core router implementation providing a route table. This implementation * has memoization for faster route resolutions, a statistics interface, * and supports dynamic route table modifications. <P> * * Possible enhancements include: keeping memoized routes if route table * changes would not affect them, better statistics, and a way * to serialize route table information to XML.<P> * * This implementation is not thread-safe. Guard all access of the * router with appropriate synchronization. * */ public final class Router implements IRouter, IConfigurableRouter { private final List excludedRoutes, routes; private final Set nodes; private String myLabel = "undefined"; // memoize our route table private transient final Map memoization; private transient volatile int nCacheHits, nCacheRequests; private static final int MEMOIZATION_INITIAL_SIZE = 50; // constants private static final Set emptySet = Collections.EMPTY_SET; public static final long serialVersionUID=15; public Router() { this.excludedRoutes = new ArrayList(); this.routes = new ArrayList(); this.nodes = new HashSet(); this.memoization = new HashMap(MEMOIZATION_INITIAL_SIZE); } public Router(String label) { this(); setNodeLabel(label); } public void setNodeLabel(String label) { myLabel = label; } public String getNodeLabel() { return myLabel; } public void reset() { excludedRoutes.clear(); routes.clear(); nodes.clear(); resetCache(); } private void resetCache() { memoization.clear(); } public Collection getRoutes(String spec) { return getRoutes(new StaticSourceSpec(spec)); } public Collection getRoutes(SourceSpec spec) { Object cached; nCacheRequests++; if (spec.shouldCacheResults()) { if ((cached = memoization.get(spec)) != null) { nCacheHits++; return ((Collection)cached); } else { Collection routes = reallyGetRoutes(spec); memoization.put(spec,routes); return routes; } } else { return reallyGetRoutes(spec); } } /** * Returns a set of source specifications that are currently * mapped to the specified destination node. This is considered a reverse * lookup and may potentially be costly, depending on implementation.<P> * * @param dest the destination * @return a Collection of SourceSpec objects * @throws UnsupportedOperationException if the router does not * choose to implement this reverse lookup function. */ public Collection getRoutesTo(RouteDestNode dest) { Set toRemove = new HashSet(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(dest)) toRemove.add(br.src); } return toRemove; } /** * Does a reverse-lookup and returns a collection of * <code>BoundRoute</code> objects that are the mappings * such that <code>BoundRoute.dest.equals(dest)</code>. * * @return Collection of BoundRoute objects * @param dest the destination */ private Collection getBoundRoutesTo(RouteDestNode dest) { Set toRemove = new HashSet(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(dest)) toRemove.add(br); } return toRemove; } private Set getExcludedSet(SourceSpec spec) { Set exclusionSet = new TreeSet(); // collect a list of nodes that we are excluded from. for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.src.matches(spec)) { exclusionSet.add(br.dest); } } return exclusionSet; } public Set getKnownNodes() { return nodes; } private Collection reallyGetRoutes(SourceSpec spec) { Set exclusionSet, sendSet; // if we are excluded from everywhere that we know about, // shortcut and don't look at any more routes. exclusionSet = getExcludedSet( spec ); if (exclusionSet.containsAll(nodes)) return emptySet; Map preSendMap = new TreeMap(); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.src.matches(spec)) { Object existingSource = preSendMap.get(br.dest); if (existingSource == null || br.src.isMoreSpecificThan(((BoundRoute)existingSource).src)) { preSendMap.put(br.dest, br); } } } // take the dest's from the sendmap as sendSet sendSet = new TreeSet(); for(Iterator i = preSendMap.values().iterator();i.hasNext();) { sendSet.add( ((BoundRoute)i.next()).getDestination() ); } // well now we just take {routed - excluded} and give it back // we will exclude any nodes that we do not know about. sendSet.retainAll(nodes); sendSet.removeAll(exclusionSet); return sendSet; } public void addRoute(SourceSpec spec, RouteDestNode rdn) { routes.remove( new BoundRoute(spec, rdn) ); routes.add( new BoundRoute(spec, rdn) ); resetCache(); } public void excludeRoute(SourceSpec spec, RouteDestNode rdn) { excludedRoutes.add( new BoundRoute(spec, rdn) ); resetCache(); } public void remove(SourceSpec spec, RouteDestNode rdn) { // remove this one from either routes and/or excluded routes BoundRoute br = new BoundRoute(spec, rdn); routes.remove(br); resetCache(); } public void removeExclusion(SourceSpec spec, RouteDestNode rdn) { // remove this one from either routes and/or excluded routes BoundRoute br = new BoundRoute(spec, rdn); excludedRoutes.remove(br); resetCache(); } public void removeRoutesTo(RouteDestNode node) { // remove routes Collection toRemove = getBoundRoutesTo(node); for(Iterator j = toRemove.iterator();j.hasNext();) routes.remove(j.next()); // remove exclusions toRemove.clear(); for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); if (br.dest.equals(node)) toRemove.add(br); } for(Iterator j = toRemove.iterator();j.hasNext();) excludedRoutes.remove(j.next()); } public void addKnownNode(RouteDestNode node) { nodes.add(node); resetCache(); } public void removeKnownNode(RouteDestNode node) { nodes.remove(node); resetCache(); } public String toString() { StringBuffer sb = new StringBuffer(); sb.append("Routes:\n"); for(Iterator i = routes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); sb.append(br.toString()); sb.append("\n"); } if (excludedRoutes.size() > 0) { sb.append("\nExcluded Routes:\n"); for(Iterator i = excludedRoutes.iterator();i.hasNext();) { BoundRoute br = (BoundRoute)i.next(); sb.append(br.toString()); sb.append("\n"); } } sb.append("\nNodes: "); for(Iterator i = nodes.iterator();i.hasNext();) { sb.append( ((RouteDestNode)i.next()).toString() ).append(" "); } sb.append("\n"); return sb.toString(); } private static class BoundRoute { private SourceSpec src; private RouteDestNode dest; private BoundRoute(SourceSpec src, RouteDestNode rdn) { this.src = src; this.dest = rdn; } public BoundRoute() { } public RouteDestNode getDestination() {return dest;} public String toString() { return src.toString() + " -> " + dest.toString(); } public boolean equals(Object obj) { try { BoundRoute br = (BoundRoute)obj; return (br.src.equals(src) && br.dest.equals(dest)); } catch(ClassCastException cce) { return false; } } } } --- NEW FILE: RegexpHelper.java --- package com.ubermq.jms.common.routing.impl; /** * A helper class providing regex translation facilities. */ public class RegexpHelper { /** * The UberMQ topic matching scheme is quite simple, but provides * access to pure regex if that functionality is desired. * <P> * * <code>*</code> matches exactly one topic level, e.g. * <code>a.b.*</code> matches <code>a.b.x</code>, <code>a.b.y</code>, * but not <code>a.b.y.z</code> * <P> * * <code>#</code> matches any number of topic levels, e.g. * <code>a.b.#</code> matches <code>a.b.x</code>, <code>a.b.y</code>, * and <code>a.b.y.z</code> * <P> * * Topic spaces preceded by a . will not be matched by wildcards, * so .secret will not be matched by #, nor *. This serves * to hide topics from aggressive wildcarding. These topics can be * subscribed to by explicitly naming them. * <P> * * System topics begin with a <code>.$</code>. Please do not subscribe to or use these, * or your application behavior will be undefined. * <P> * * Finally, for regex fans, the underlying regex engine can be * accessed by specifying a tilde (~) as the first character of the topic name. * So, <code>~.*</code> will match all topic names, even hidden or system topics. */ public static String xlat(String friendly) { // bypass the translation if the first character is a ~ if (friendly.length() > 0 && friendly.charAt(0) == '~') return friendly.substring(1); StringBuffer regexp = new StringBuffer(friendly); replaceChar(regexp, '$', "\\$"); replaceChar(regexp, '.', "\\."); replaceChar(regexp, '*', "[^\\.]*"); replaceChar(regexp, '#', "[^\\.].*"); return regexp.toString(); } /** * Replaces all instances of a character in a string buffer with * a character sequence. */ public static void replaceChar(StringBuffer regexp, char ch, String repl) { for(int i=0;i < regexp.length();i++) { if (regexp.charAt(i) == ch) { regexp.replace(i, i+1, repl); i += repl.length(); } } } } --- NEW FILE: RegexpSourceSpec.java --- package com.ubermq.jms.common.routing.impl; import com.ubermq.jms.common.routing.*; import java.io.*; import java.util.regex.*; /** * A source specification that uses regular expressions to * determine if two specifications match. */ public class RegexpSourceSpec implements Externalizable, SourceSpec { private transient Pattern regexp; private String expr; private String nice; private static final long serialVersionUID = 5; public RegexpSourceSpec() { } public RegexpSourceSpec(String expr) { this(expr, expr); } public RegexpSourceSpec(String expr, String nice) throws IllegalArgumentException { this.expr = expr; this.nice = nice; this.regexp = Pattern.compile(this.expr, Pattern.CASE_INSENSITIVE); } public void readExternal(ObjectInput in) throws IOException { nice = in.readUTF(); expr = in.readUTF(); regexp = Pattern.compile(expr, Pattern.CASE_INSENSITIVE); } public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(nice); out.writeUTF(expr); } /** * @return a human readable name for this destination. */ public String getDisplayName() { return nice; } /** * An implementation can return true to indicate that * the results of this routing should be cached. * Mostly this should return true except if the created * source spec is a throwaway and will only ever match once. */ public boolean shouldCacheResults() { return true; } public boolean matches(SourceSpec s) { return regexp.matcher(s.toString()).matches(); } public boolean isMoreSpecificThan(SourceSpec s) { try { RegexpSourceSpec rss = (RegexpSourceSpec)s; return (specificity() > rss.specificity()); } catch(ClassCastException cce) {return false;} } private int specificity() { int score = 0; java.util.StringTokenizer st = new java.util.StringTokenizer(nice, "."); while(st.hasMoreTokens()) { String token = st.nextToken(); char ch = token.charAt(0); if (Character.isLetterOrDigit(ch)) score += 3; else if (ch == '*') score += 2; else if (ch == '#') score += 1; } return score; } public String toString() { return nice; } public boolean equals(Object obj) { if(obj instanceof RegexpSourceSpec) { return nice.equals( ((RegexpSourceSpec)obj).nice ); } else if (obj instanceof SourceSpec) return matches((SourceSpec)obj); else return false; } } |