User: rds13
Date: 04/01/05 01:54:07
Modified: src/org/jgroups/blocks DistributedQueue.java
Log:
cleanup constructor exceptions
Revision Changes Path
1.6 +163 -92 JGroups/src/org/jgroups/blocks/DistributedQueue.java
Index: DistributedQueue.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/blocks/DistributedQueue.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- DistributedQueue.java 23 Dec 2003 12:54:34 -0000 1.5
+++ DistributedQueue.java 5 Jan 2004 09:54:07 -0000 1.6
@@ -1,8 +1,8 @@
-// $Id: DistributedQueue.java,v 1.5 2003/12/23 12:54:34 rds13 Exp $
-
+// $Id: DistributedQueue.java,v 1.6 2004/01/05 09:54:07 rds13 Exp $
package org.jgroups.blocks;
import java.io.Serializable;
+
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
@@ -10,9 +10,11 @@
import java.util.Vector;
import org.apache.log4j.Logger;
+
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
+import org.jgroups.ChannelException;
import org.jgroups.ChannelFactory;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
@@ -20,9 +22,11 @@
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.View;
+
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
+
/**
* Provides the abstraction of a java.util.LinkedList that is replicated at several
* locations. Any change to the list (reset, add, remove etc) will transparently be
@@ -38,24 +42,26 @@
*/
public class DistributedQueue implements MessageListener, MembershipListener, Cloneable
{
- static Logger logger = Logger.getLogger(DistributedQueue.class.getName());
- private long internal_timeout = 10000; // 10 seconds to wait for a response
-
- /*lock object for synchronization*/
- Object mutex = new Object();
-
- private transient boolean stopped = false; // whether to we are stopped !
-
- private LinkedList internalQueue;
public interface Notification
{
void entryAdd(Object value);
+
void entryRemoved(Object key);
+
void viewChange(Vector new_mbrs, Vector old_mbrs);
+
void contentsCleared();
+
void contentsSet(Collection new_entries);
}
+ static Logger logger = Logger.getLogger(DistributedQueue.class.getName());
+ private long internal_timeout = 10000; // 10 seconds to wait for a response
+
+ /*lock object for synchronization*/
+ Object mutex = new Object();
+ private transient boolean stopped = false; // whether to we are stopped !
+ private LinkedList internalQueue;
private transient Channel channel;
private transient RpcDispatcher disp = null;
private transient String groupname = null;
@@ -77,71 +83,64 @@
* @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
*/
public DistributedQueue(String groupname, ChannelFactory factory, String properties, long state_timeout)
+ throws ChannelException
{
if (logger.isDebugEnabled())
- logger.debug("DistributedQueue(" + groupname + "," + properties + "," + state_timeout);
- this.groupname = groupname;
- try
- {
- initMethods();
- internalQueue = new LinkedList();
- channel = factory != null ? factory.createChannel(properties) : new JChannel(properties);
- disp = new RpcDispatcher(channel, this, this, this);
- disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
- channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- channel.connect(groupname);
- start(state_timeout);
- }
- catch (Exception e)
{
- logger.error("DistributedQueue.DistributedQueue()", e);
+ logger.debug("DistributedQueue(" + groupname + "," + properties + "," + state_timeout);
}
+
+ this.groupname = groupname;
+ initMethods();
+ internalQueue = new LinkedList();
+ channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties);
+ disp = new RpcDispatcher(channel, this, this, this);
+ disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
+ channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
+ channel.connect(groupname);
+ start(state_timeout);
}
- public DistributedQueue(JChannel channel) throws ChannelNotConnectedException, ChannelClosedException
+ public DistributedQueue(JChannel channel)
{
this.groupname = channel.getChannelName();
this.channel = channel;
init();
}
- /**
- * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
- * used to register under that id. This is typically used when another building block is already using
- * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
- * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
- * first block created on PullPushAdapter.
- * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
- * to register as a lessoner for Notifications events.
- * @param adapter The PullPushAdapter which to use as underlying transport
- * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
- * requests/responses for different building blocks on top of PullPushAdapter.
- * @param state_timeout Max number of milliseconds to wait for state to be retrieved
- */
- public DistributedQueue(PullPushAdapter adapter, Serializable id)
- {
- this.channel = (Channel)adapter.getTransport();
- this.groupname = this.channel.getChannelName();
-
- initMethods();
- internalQueue = new LinkedList();
-
- channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- disp=new RpcDispatcher(adapter, id, this, this, this);
- disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
- }
-
- protected void init() throws ChannelClosedException, ChannelNotConnectedException
+ /**
+ * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
+ * used to register under that id. This is typically used when another building block is already using
+ * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
+ * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
+ * first block created on PullPushAdapter.
+ * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
+ * to register as a lessoner for Notifications events.
+ * @param adapter The PullPushAdapter which to use as underlying transport
+ * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
+ * requests/responses for different building blocks on top of PullPushAdapter.
+ * @param state_timeout Max number of milliseconds to wait for state to be retrieved
+ */
+ public DistributedQueue(PullPushAdapter adapter, Serializable id)
{
+ this.channel = (Channel)adapter.getTransport();
+ this.groupname = this.channel.getChannelName();
+
initMethods();
internalQueue = new LinkedList();
+
channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
- disp = new RpcDispatcher(channel, this, this, this);
+ disp = new RpcDispatcher(adapter, id, this, this, this);
disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
+ }
- // Changed by bela (jan 20 2003): sart() has to be called by user (only when providing
- // own channel). First, Channel.connect() has to be called, then start().
- // start(state_timeout);
+ protected void init()
+ {
+ initMethods();
+ internalQueue = new LinkedList();
+ channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
+ disp = new RpcDispatcher(channel, this, this, this);
+ disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
}
public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException
@@ -150,8 +149,11 @@
logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval");
rc = channel.getState(null, state_timeout);
+
if (rc)
+ {
logger.info("DistributedQueue.initState(" + groupname + "): state was retrieved successfully");
+ }
else
{
logger.info("DistributedQueue.initState(" + groupname + "): state could not be retrieved (first member)");
@@ -160,8 +162,9 @@
public Address getLocalAddress()
{
- return channel != null ? channel.getLocalAddress() : null;
+ return (channel != null) ? channel.getLocalAddress() : null;
}
+
public Channel getChannel()
{
return channel;
@@ -170,7 +173,9 @@
public void addNotifier(Notification n)
{
if (!notifs.contains(n))
+ {
notifs.addElement(n);
+ }
}
public void stop()
@@ -185,11 +190,13 @@
disp.stop();
disp = null;
}
+
if (channel != null)
{
channel.close();
channel = null;
}
+
stopped = true;
}
}
@@ -204,19 +211,25 @@
{
Object retval = null;
add_method.setArg(0, value);
+
RspList rsp = disp.callRemoteMethods(null, add_method, GroupRequest.GET_ALL, 0);
Vector results = rsp.getResults();
+
if (results.size() > 0)
{
retval = results.elementAt(0);
+
if (logger.isDebugEnabled())
+ {
checkResult(rsp, retval);
+ }
}
}
- catch (Exception e)
+ catch (Exception e)
{
logger.error("Unable to add value " + value, e);
}
+
return;
}
@@ -231,10 +244,11 @@
addAtHead_method.setArg(0, value);
disp.callRemoteMethods(null, addAtHead_method, GroupRequest.GET_ALL, 0);
}
- catch (Exception e)
+ catch (Exception e)
{
logger.error("Unable to addAtHead value " + value, e);
}
+
return;
}
@@ -251,10 +265,11 @@
addAll_method.setArg(0, values);
disp.callRemoteMethods(null, addAll_method, GroupRequest.GET_ALL, 0);
}
- catch (Exception e)
+ catch (Exception e)
{
logger.error("Unable to addAll value: " + values, e);
}
+
return;
}
@@ -262,8 +277,10 @@
{
Vector result = new Vector();
int i = 0;
+
for (Iterator e = internalQueue.iterator(); e.hasNext();)
result.add(e.next());
+
return result;
}
@@ -281,13 +298,15 @@
public Object peek()
{
Object retval = null;
+
try
{
retval = internalQueue.getFirst();
}
- catch (NoSuchElementException e)
+ catch (NoSuchElementException e)
{
}
+
return retval;
}
@@ -297,7 +316,7 @@
{
disp.callRemoteMethods(null, reset_method, GroupRequest.GET_ALL, 0);
}
- catch (Exception e)
+ catch (Exception e)
{
logger.error("DistributedQueue.reset(" + groupname + ")", e);
}
@@ -306,11 +325,16 @@
protected void checkResult(RspList rsp, Object retval)
{
if (logger.isDebugEnabled())
+ {
logger.debug("Value updated from " + groupname + " :" + retval);
+ }
+
Vector results = rsp.getResults();
+
for (int i = 0; i < results.size(); i++)
{
Object data = results.elementAt(i);
+
if (!data.equals(retval))
{
logger.error("Reference value differs from returned value " + retval + " != " + data);
@@ -327,12 +351,17 @@
Object retval = null;
RspList rsp = disp.callRemoteMethods(null, remove_method, GroupRequest.GET_ALL, internal_timeout);
Vector results = rsp.getResults();
+
if (results.size() > 0)
{
retval = results.elementAt(0);
+
if (logger.isDebugEnabled())
+ {
checkResult(rsp, retval);
+ }
}
+
return retval;
}
@@ -351,13 +380,19 @@
{
RspList rsp = disp.callRemoteMethods(null, remove_method, GroupRequest.GET_ALL, internal_timeout);
Vector results = rsp.getResults();
+
if (results.size() > 0)
{
retval = results.elementAt(0);
+
if (logger.isDebugEnabled())
+ {
checkResult(rsp, retval);
+ }
}
+
if (retval == null)
+ {
try
{
synchronized (mutex)
@@ -365,37 +400,47 @@
mutex.wait();
}
}
- catch (InterruptedException e)
+ catch (InterruptedException e)
{
}
+ }
}
}
else
{
-
- while ((System.currentTimeMillis() - start) < timeout && !stopped && (retval == null))
+ while (((System.currentTimeMillis() - start) < timeout) && !stopped && (retval == null))
{
RspList rsp = disp.callRemoteMethods(null, remove_method, GroupRequest.GET_ALL, internal_timeout);
Vector results = rsp.getResults();
+
if (results.size() > 0)
{
retval = results.elementAt(0);
+
if (logger.isDebugEnabled())
+ {
checkResult(rsp, retval);
+ }
}
+
if (retval == null)
+ {
try
{
long delay = timeout - (System.currentTimeMillis() - start);
+
synchronized (mutex)
{
if (delay > 0)
+ {
mutex.wait(delay);
+ }
}
}
- catch (InterruptedException e)
+ catch (InterruptedException e)
{
}
+ }
}
}
@@ -408,11 +453,12 @@
}
/*------------------------ Callbacks -----------------------*/
-
public void _add(Object value)
{
if (logger.isDebugEnabled())
+ {
logger.debug(groupname + "@" + getLocalAddress() + " _add(" + value + ")");
+ }
/*lock the queue from other threads*/
synchronized (mutex)
@@ -424,7 +470,7 @@
}
for (int i = 0; i < notifs.size(); i++)
- ((Notification) notifs.elementAt(i)).entryAdd(value);
+ ((Notification)notifs.elementAt(i)).entryAdd(value);
}
public void _addAtHead(Object value)
@@ -439,17 +485,20 @@
}
for (int i = 0; i < notifs.size(); i++)
- ((Notification) notifs.elementAt(i)).entryAdd(value);
+ ((Notification)notifs.elementAt(i)).entryAdd(value);
}
public void _reset()
{
if (logger.isDebugEnabled())
+ {
logger.debug(groupname + "@" + getLocalAddress() + " _reset()");
+ }
_private_reset();
+
for (int i = 0; i < notifs.size(); i++)
- ((Notification) notifs.elementAt(i)).contentsCleared();
+ ((Notification)notifs.elementAt(i)).contentsCleared();
}
protected void _private_reset()
@@ -467,9 +516,9 @@
public Object _remove()
{
Object retval = null;
+
try
{
-
/*lock the queue from other threads*/
synchronized (mutex)
{
@@ -480,22 +529,27 @@
}
if (logger.isDebugEnabled())
+ {
logger.debug(groupname + "@" + getLocalAddress() + "_remove(" + retval + ")");
+ }
+
for (int i = 0; i < notifs.size(); i++)
- ((Notification) notifs.elementAt(i)).entryRemoved(retval);
+ ((Notification)notifs.elementAt(i)).entryRemoved(retval);
}
- catch (NoSuchElementException e)
+ catch (NoSuchElementException e)
{
logger.debug(groupname + "@" + getLocalAddress() + "_remove(): nothing to remove");
}
+
return retval;
}
public void _addAll(Collection c)
{
-
if (logger.isDebugEnabled())
+ {
logger.debug(groupname + "@" + getLocalAddress() + " _addAll(" + c + ")");
+ }
/*lock the queue from other threads*/
synchronized (mutex)
@@ -507,29 +561,29 @@
}
for (int i = 0; i < notifs.size(); i++)
- ((Notification) notifs.elementAt(i)).contentsSet(c);
+ ((Notification)notifs.elementAt(i)).contentsSet(c);
}
/*----------------------------------------------------------*/
-
/*-------------------- State Exchange ----------------------*/
-
public void receive(Message msg)
{
}
public byte[] getState()
{
- Object key, val;
- Vector copy = (Vector) getContents().clone();
+ Object key;
+ Object val;
+ Vector copy = (Vector)getContents().clone();
try
{
return Util.objectToByteBuffer(copy);
}
- catch (Throwable ex)
+ catch (Throwable ex)
{
logger.error("DistributedQueue.getState(): exception marshalling state.", ex);
+
return null;
}
}
@@ -541,13 +595,17 @@
try
{
- new_copy = (Vector) Util.objectFromByteBuffer(new_state);
+ new_copy = (Vector)Util.objectFromByteBuffer(new_state);
+
if (new_copy == null)
+ {
return;
+ }
}
- catch (Throwable ex)
+ catch (Throwable ex)
{
logger.error("DistributedQueue.setState(): exception unmarshalling state.", ex);
+
return;
}
@@ -556,7 +614,6 @@
}
/*------------------- Membership Changes ----------------------*/
-
public void viewAccepted(View new_view)
{
Vector new_mbrs = new_view.getMembers();
@@ -565,6 +622,7 @@
{
sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
members.removeAllElements();
+
for (int i = 0; i < new_mbrs.size(); i++)
members.addElement(new_mbrs.elementAt(i));
}
@@ -583,27 +641,38 @@
void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs)
{
- Vector joined, left;
+ Vector joined;
+ Vector left;
Object mbr;
Notification n;
- if (notifs.size() == 0 || old_mbrs == null || new_mbrs == null || old_mbrs.size() == 0 || new_mbrs.size() == 0)
+ if (
+ (notifs.size() == 0) || (old_mbrs == null) || (new_mbrs == null) || (old_mbrs.size() == 0) ||
+ (new_mbrs.size() == 0))
+ {
return;
+ }
// 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
joined = new Vector();
+
for (int i = 0; i < new_mbrs.size(); i++)
{
mbr = new_mbrs.elementAt(i);
+
if (!old_mbrs.contains(mbr))
+ {
joined.addElement(mbr);
+ }
}
// 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
left = new Vector();
+
for (int i = 0; i < old_mbrs.size(); i++)
{
mbr = old_mbrs.elementAt(i);
+
if (!new_mbrs.contains(mbr))
{
left.addElement(mbr);
@@ -612,7 +681,7 @@
for (int i = 0; i < notifs.size(); i++)
{
- n = (Notification) notifs.elementAt(i);
+ n = (Notification)notifs.elementAt(i);
n.viewChange(joined, left);
}
}
@@ -640,14 +709,16 @@
}
if (reset_method == null)
+ {
reset_method = new MethodCall(getClass().getMethod("_reset", new Class[0]));
+ }
if (remove_method == null)
{
remove_method = new MethodCall(getClass().getMethod("_remove", new Class[0]));
}
}
- catch (Throwable ex)
+ catch (Throwable ex)
{
logger.error("DistributedQueue.initMethods()", ex);
}
@@ -666,15 +737,16 @@
// A simpler setup is
// DistributedQueue ht = new DistributedQueue("demo", null,
// "file://c:/JGroups-2.0/conf/total-token.xml", 5000);
-
JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/conf/total-token.xml");
c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
+
DistributedQueue ht = new DistributedQueue(c);
c.connect("demo");
ht.start(5000);
ht.add("name");
ht.add("Michelle Ban");
+
Object old_key = ht.remove();
System.out.println("old key was " + old_key);
old_key = ht.remove();
@@ -684,10 +756,9 @@
System.out.println("queue is " + ht);
}
- catch (Throwable t)
+ catch (Throwable t)
{
t.printStackTrace();
}
}
-
}
|