User: belaban
Date: 07/08/14 00:18:00
Modified: src/org/jgroups JChannel.java
Log:
join and state transfer (http://jira.jboss.com/jira/browse/JGRP-236)
Revision Changes Path
1.141 +115 -61 JGroups/src/org/jgroups/JChannel.java
Index: JChannel.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/JChannel.java,v
retrieving revision 1.140
retrieving revision 1.141
diff -u -r1.140 -r1.141
--- JChannel.java 2 Aug 2007 15:12:38 -0000 1.140
+++ JChannel.java 14 Aug 2007 07:18:00 -0000 1.141
@@ -71,7 +71,7 @@
* the construction of the stack will be aborted.
*
* @author Bela Ban
- * @version $Id: JChannel.java,v 1.140 2007/08/02 15:12:38 belaban Exp $
+ * @version $Id: JChannel.java,v 1.141 2007/08/14 07:18:00 belaban Exp $
*/
public class JChannel extends Channel {
@@ -348,53 +348,13 @@
* A new channel has to be created first.
*/
public synchronized void connect(String cluster_name) throws ChannelException {
- /*make sure the channel is not closed*/
- checkClosed();
-
- /*if we already are connected, then ignore this*/
- if(connected) {
- if(log.isTraceEnabled()) log.trace("already connected to " + cluster_name);
- return;
- }
-
- /*make sure we have a valid channel name*/
- if(cluster_name == null) {
- if(log.isDebugEnabled()) log.debug("cluster_name is null, assuming unicast channel");
- }
- else
- this.cluster_name=cluster_name;
-
- try {
- prot_stack.startStack(); // calls start() in all protocols, from top to bottom
- }
- catch(Throwable e) {
- throw new ChannelException("failed to start protocol stack", e);
- }
-
- String tmp=Util.getProperty(new String[]{Global.CHANNEL_LOCAL_ADDR_TIMEOUT, "local_addr.timeout"},
- null, null, false, "30000");
- LOCAL_ADDR_TIMEOUT=Long.parseLong(tmp);
-
- /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
- local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
- if(local_addr == null) {
- log.fatal("local_addr is null; cannot connect");
- throw new ChannelException("local_addr is null");
- }
-
-
- /*create a temporary view, assume this channel is the only member and
- *is the coordinator*/
- Vector t=new Vector(1);
- t.addElement(local_addr);
- my_view=new View(local_addr, 0, t); // create a dummy view
+ startStack(cluster_name);
// only connect if we are not a unicast channel
if(cluster_name != null) {
-
if(flush_supported)
flush_unblock_promise.reset();
-
+
Event connect_event=new Event(Event.CONNECT, cluster_name);
Object res=downcall(connect_event); // waits forever until connected (or channel is closed)
if(res != null && res instanceof Exception) { // the JOIN was rejected by the coordinator
@@ -402,15 +362,15 @@
}
//if FLUSH is used do not return from connect() until UNBLOCK event is received
- boolean singletonMember = my_view != null && my_view.size() == 1;
- boolean shouldWaitForUnblock = flush_supported && receive_blocks && !singletonMember && !flush_unblock_promise.hasResult();
- if(shouldWaitForUnblock){
- try{
+ boolean singletonMember=my_view != null && my_view.size() == 1;
+ boolean shouldWaitForUnblock=flush_supported && receive_blocks && !singletonMember && !flush_unblock_promise.hasResult();
+ if(shouldWaitForUnblock) {
+ try {
flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
}
- catch (TimeoutException te){
+ catch (TimeoutException timeout) {
if(log.isWarnEnabled())
- log.warn(local_addr + " waiting on UNBLOCK after connect timed out");
+ log.warn(local_addr + " waiting on UNBLOCK after connect() timed out");
}
}
}
@@ -429,10 +389,48 @@
* @throws ChannelException
*/
public synchronized boolean connect(String cluster_name, Address target, String state_id, long timeout) throws ChannelException {
- throw new UnsupportedOperationException("not yet implemented");
+ startStack(cluster_name);
+
+ boolean stateTransferSuccessful=false;
+ boolean joinSuccessful=false;
+ // only connect if we are not a unicast channel
+ if(cluster_name != null) {
+ if(flush_supported)
+ flush_unblock_promise.reset();
+
+ Event connect_event=new Event(Event.CONNECT_WITH_STATE_TRANSFER, cluster_name);
+ Object res=downcall(connect_event); // waits forever until connected (or channel is closed)
+ joinSuccessful=!(res != null && res instanceof Exception);
+
+ if(joinSuccessful) {
+ connected=true;
+ notifyChannelConnected(this);
+ stateTransferSuccessful=getState(target, state_id, timeout, false);
+ boolean singletonMember=my_view != null && my_view.size() == 1;
+ boolean shouldWaitForUnblock=flush_supported && receive_blocks
+ && !singletonMember
+ && !flush_unblock_promise.hasResult();
+ if(shouldWaitForUnblock) {
+ try {
+ flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
+ }
+ catch(TimeoutException te) {
+ if(log.isWarnEnabled())
+ log.warn(local_addr + " waiting on UNBLOCK after connect timed out");
+ }
+ }
+ }
+ else {
+ throw new ChannelException("connect() failed", (Throwable)res);
+ }
+ }
+ return joinSuccessful && stateTransferSuccessful;
}
+
+
+
/**
* Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
* Otherwise the following actions happen in the listed order<BR>
@@ -865,6 +863,20 @@
* @throws ChannelClosedException
*/
public boolean getState(Address target, String state_id, long timeout) throws ChannelNotConnectedException, ChannelClosedException {
+ return getState(target, state_id, timeout, true);
+ }
+
+ /**
+ * Retrieves a substate (or partial state) from the target.
+ * @param target State provider. If null, coordinator is used
+ * @param state_id The ID of the substate. If null, the entire state will be transferred
+ * @param timeout the number of milliseconds to wait for the operation to complete successfully. 0 waits until
+ * the state has been received
+ * @return
+ * @throws ChannelNotConnectedException
+ * @throws ChannelClosedException
+ */
+ protected boolean getState(Address target, String state_id, long timeout,boolean useFlushIfPresent) throws ChannelNotConnectedException, ChannelClosedException {
if(target == null)
target=determineCoordinator();
if(target != null && local_addr != null && target.equals(local_addr)) {
@@ -873,7 +885,7 @@
return false;
}
- StateTransferInfo info=new StateTransferInfo(target, state_id, timeout);
+ StateTransferInfo info=new StateTransferInfo(target, state_id, timeout,useFlushIfPresent);
boolean rc=_getState(new Event(Event.GET_STATE, info), info);
if(rc == false)
down(new Event(Event.RESUME_STABLE));
@@ -1340,6 +1352,48 @@
}
+ private void startStack(String cluster_name) throws ChannelException {
+ /*make sure the channel is not closed*/
+ checkClosed();
+
+ /*if we already are connected, then ignore this*/
+ if(connected) {
+ if(log.isTraceEnabled()) log.trace("already connected to " + cluster_name);
+ return;
+ }
+
+ /*make sure we have a valid channel name*/
+ if(cluster_name == null) {
+ if(log.isDebugEnabled()) log.debug("cluster_name is null, assuming unicast channel");
+ }
+ else
+ this.cluster_name=cluster_name;
+
+ try {
+ prot_stack.startStack(); // calls start() in all protocols, from top to bottom
+ }
+ catch(Throwable e) {
+ throw new ChannelException("failed to start protocol stack", e);
+ }
+
+ String tmp=Util.getProperty(new String[]{Global.CHANNEL_LOCAL_ADDR_TIMEOUT, "local_addr.timeout"},
+ null, null, false, "30000");
+ LOCAL_ADDR_TIMEOUT=Long.parseLong(tmp);
+
+ /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
+ local_addr=(Address)local_addr_promise.getResult(LOCAL_ADDR_TIMEOUT);
+ if(local_addr == null) {
+ log.fatal("local_addr is null; cannot connect");
+ throw new ChannelException("local_addr is null");
+ }
+
+ /*create a temporary view, assume this channel is the only member and is the coordinator*/
+ Vector t=new Vector(1);
+ t.addElement(local_addr);
+ my_view=new View(local_addr, 0, t); // create a dummy view
+ }
+
+
/**
* health check<BR>
@@ -1429,22 +1483,22 @@
}
if(flush_supported)
- flush_unblock_promise.reset();
+ flush_unblock_promise.reset();
state_promise.reset();
down(evt);
Boolean state_transfer_successfull=(Boolean)state_promise.getResult(info.timeout);
//if FLUSH is used do not return from getState() until UNBLOCK event is received
- boolean shouldWaitForUnblock = flush_supported && receive_blocks;
- if(shouldWaitForUnblock){
- try{
- flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
- }
- catch (TimeoutException te){
- if(log.isWarnEnabled())
- log.warn(local_addr + " waiting on UNBLOCK after getState timed out");
- }
+ boolean shouldWaitForUnblock=flush_supported && receive_blocks;
+ if(shouldWaitForUnblock) {
+ try {
+ flush_unblock_promise.getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
+ }
+ catch(TimeoutException te) {
+ if(log.isWarnEnabled())
+ log.warn(local_addr + " waiting on UNBLOCK after getState timed out");
+ }
}
return state_transfer_successfull != null && state_transfer_successfull.booleanValue();
|