From: Brian S. <bri...@wa...> - 2005-10-11 19:13:22
|
User: bstansberry Date: 05/10/11 15:13:13 Modified: src/org/jboss/cache Tag: JBossCache_1_2_4 TreeCache.java Log: 1) When activating a region, request state with a short timeout. If any node replies with a timeout exception, retry with a longer timeout, fail after 3rd attempt. 2) JBCACHE-335. Factor out the preparation/integration of state tfer data from TreeCache; use a factory pattern to allow interoperability between different JBossCache versions. Revision Changes Path No revision No revision 1.56.2.8 +339 -360 JBossCache/src/org/jboss/cache/TreeCache.java (In the diff below, changes in quantity of whitespace are not shown.) Index: TreeCache.java =================================================================== RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v retrieving revision 1.56.2.7 retrieving revision 1.56.2.8 diff -u -b -r1.56.2.7 -r1.56.2.8 --- TreeCache.java 8 Oct 2005 00:39:29 -0000 1.56.2.7 +++ TreeCache.java 11 Oct 2005 19:13:12 -0000 1.56.2.8 @@ -17,19 +17,18 @@ import org.jboss.cache.loader.AsyncExtendedCacheLoader; import org.jboss.cache.loader.CacheLoader; import org.jboss.cache.loader.ExtendedCacheLoader; -import org.jboss.cache.loader.NodeData; import org.jboss.cache.lock.IsolationLevel; import org.jboss.cache.lock.LockStrategyFactory; import org.jboss.cache.lock.LockingException; import org.jboss.cache.lock.TimeoutException; -import org.jboss.cache.marshall.MarshallUtil; import org.jboss.cache.marshall.Region; import org.jboss.cache.marshall.RegionManager; import org.jboss.cache.marshall.TreeCacheMarshaller; import org.jboss.cache.marshall.RegionNameConflictException; import org.jboss.cache.marshall.RegionNotFoundException; -import org.jboss.invocation.MarshalledValueInputStream; -import org.jboss.invocation.MarshalledValueOutputStream; +import org.jboss.cache.statetransfer.StateTransferFactory; +import org.jboss.cache.statetransfer.StateTransferGenerator; +import org.jboss.cache.statetransfer.StateTransferIntegrator; import org.jboss.system.ServiceMBeanSupport; import org.jboss.util.NestedRuntimeException; import org.jgroups.*; @@ -48,12 +47,6 @@ import javax.transaction.Transaction; import javax.transaction.TransactionManager; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.util.*; @@ -66,13 +59,16 @@ * @author Bela Ban * @author Ben Wang * @author <a href="mailto:ma...@jb...">Manik Surtani (ma...@jb...)</a> - * @version $Id: TreeCache.java,v 1.56.2.7 2005/10/08 00:39:29 bstansberry Exp $ + * @author Brian Stansberry + * @version $Id: TreeCache.java,v 1.56.2.8 2005/10/11 19:13:12 bstansberry Exp $ * <p/> */ public class TreeCache extends ServiceMBeanSupport implements TreeCacheMBean, Cloneable, MembershipListener { public static final Fqn ROOT_FQN = Fqn.fromString("/"); + public static final short DEFAULT_STATE_TRANSFER_VERSION = 124; + // Quite poor, but for now, root may be re-initialised when setNodeLockingOptimistic() is called. // this is because if node locking is optimistic, we need to use OptimisticTreeNodes rather than TreeNodes. // - MANIK @@ -190,7 +186,7 @@ /** The fully qualified name of the CacheLoader (has to implement the CacheLoader interface) */ protected String cache_loader_class=null; - /** A reference to the CacheLoader. If null, we don't have a CachedLoader */ + /** A reference to the CacheLoader. If null, we don't have a CacheLoader */ protected CacheLoader cache_loader=null; /** The properties from which to configure the CacheLoader */ @@ -326,7 +322,7 @@ boolean.class}); getPartialStateMethod=TreeCache.class.getDeclaredMethod("_getState", - new Class[] { Fqn.class, boolean.class } ); + new Class[] { Fqn.class, long.class, boolean.class, boolean.class } ); enqueueMethodCallMethod=TreeCache.class.getDeclaredMethod("_enqueueMethodCall", new Class[] { String.class, MethodCall.class }); @@ -695,7 +691,6 @@ * <p> * This property is only relevant if {@link #getUseMarshalling()} is * <code>true</code>. - * */ public void setInactiveOnStartup(boolean inactiveOnStartup) { @@ -1935,12 +1930,6 @@ // whose corresponding commit will thus fail after activation region.startQueuing(); - MethodCall psmc = new MethodCall(getPartialStateMethod, - new Object[]{ fqn, Boolean.FALSE }); - - MethodCall replPsmc = new MethodCall(replicateMethod, - new Object[] { psmc} ); - // If a classloader is registered for the node's region, use it ClassLoader cl = null; if (marshaller_ != null) @@ -1957,17 +1946,30 @@ subtreeRoot = createSubtreeRootNode(fqn); } - Object owner = getOwnerForLock(); + Object ourself = getLocalAddress(); // ignore ourself when we call + + // Call the cluster with progressively longer timeouts + long[] timeouts = { 400l, 800l, 1600l }; + for (int i = 0; i < timeouts.length; i++) + { + boolean retry = false; + Boolean force = (i == timeouts.length -1) ? Boolean.TRUE + : Boolean.FALSE; + + MethodCall psmc = new MethodCall(getPartialStateMethod, + new Object[]{ fqn, new Long(timeouts[i]), + force, Boolean.FALSE }); + + MethodCall replPsmc = new MethodCall(replicateMethod, + new Object[] { psmc} ); // Iterate over the group members, seeing if anyone // can give us state for this region - Object ourself = getLocalAddress(); // ignore ourself for (Iterator iter = getMembers().iterator(); iter.hasNext(); ) { Object target = iter.next(); if (ourself.equals(target) == false) { - Vector targets = new Vector(); targets.add(target); @@ -1978,23 +1980,23 @@ rsp = responses.get(0); if (rsp instanceof byte[]) { - try - { - root.acquireAll(owner, state_fetch_timeout, DataNode.LOCK_TYPE_READ); _setState((byte[]) rsp, subtreeRoot, cl); stateSet = true; - if (log.isTraceEnabled()) + if (log.isDebugEnabled()) { - log.trace("TreeCache.activateRegion(): " + ourself + + log.debug("TreeCache.activateRegion(): " + ourself + " got state from " + target); } break; } - finally + else if (rsp instanceof TimeoutException) { + retry = true; + if (log.isTraceEnabled()) { - try {root.releaseAll(owner);} catch(Throwable t) {log.error("failed releasing locks", t);} + log.trace("TreeCache.activateRegion(): " + ourself + + " got a TimeoutException from " + target); } } } @@ -2008,8 +2010,12 @@ } } - if (!stateSet && log.isTraceEnabled()) - log.trace("TreeCache.activateRegion(): No nodes able to give state"); + if (stateSet || !retry) + break; + } + + if (!stateSet && log.isDebugEnabled()) + log.debug("TreeCache.activateRegion(): No nodes able to give state"); // Lock out other activity on the region while we // we process the queue and activate the region @@ -2308,7 +2314,13 @@ * * @param fqn Fqn indicating the uppermost node in the * portion of the tree whose state should be returned. - * + * @param timeout max number of ms this method should wait to acquire + * a read lock on the nodes being transferred + * @param force if a read lock cannot be acquired after + * <code>timeout</code> ms, should the lock acquisition + * be forced, and any existing transactions holding locks + * on the nodes be rolled back? <strong>NOTE:</strong> + * In release 1.2.4, this parameter has no effect. * @param suppressErrors should any Throwable thrown be suppressed? * * @return a serialized byte[][], element 0 is the transient state @@ -2318,7 +2330,7 @@ * enabled, the requested Fqn is not the root node, and the * cache loader does not implement {@link ExtendedCacheLoader}. */ - public byte[] _getState(Fqn fqn, boolean suppressErrors) throws Throwable { + public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable { if (marshaller_ != null) { // can't give state for regions currently being activated/inactivated @@ -2355,105 +2367,217 @@ "ExtendedClassLoader; partial state transfer not supported"); } - byte[] transient_state=null; - byte[] persistent_state=null; - byte[][] states=new byte[3][]; - byte[] retval=null; boolean locked=false; Object owner = getOwnerForLock(); - states[0]=states[1]=states[2]=null; - try { try { - if(cache_loader_fetch_transient_state) { + if (cache_loader_fetch_transient_state || fetch_persistent_state) { log.info("locking the " + fqn + " subtree to return the in-memory (transient) state"); - rootNode.acquireAll(owner, state_fetch_timeout, DataNode.LOCK_TYPE_READ); - locked=true; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos); - marshallTransientState(rootNode, out); - out.close(); - transient_state = baos.toByteArray(); - states[0]=transient_state; - log.info("returning the in-memory state (" + transient_state.length + " bytes)"); - // Return any state associated with the subtree but not stored in it - states[1]= _getAssociatedState(fqn); + acquireLocksForStateTransfer(rootNode, owner, timeout, force); + locked = true; } - } - catch(Throwable t) { - log.error("failed getting the in-memory (transient) state", t); - if (!suppressErrors) - throw t; - } - try { - if(fetch_persistent_state) { - if(!locked) { - log.info("locking the tree to obtain persistent state"); - rootNode.acquireAll(owner, state_fetch_timeout, DataNode.LOCK_TYPE_READ); - locked=true; - } - log.info("getting the persistent state"); - if (getRoot) - persistent_state=cache_loader.loadEntireState(); - else - persistent_state=((ExtendedCacheLoader)cache_loader).loadState(fqn); - states[2]=persistent_state; - log.info("returning the persistent state (" + persistent_state.length + " bytes)"); - } - } - catch(Throwable t) { - log.error("failed getting the persistent state", t); - if (!suppressErrors) - throw t; - } + StateTransferGenerator generator = + StateTransferFactory.getStateTransferGenerator(this); - try { - retval = MarshallUtil.objectToByteBuffer(states); - return retval; - } - catch(Throwable t) { - log.error("failed serializing transient and persistent state", t); - if (!suppressErrors) - throw t; - return retval; - } + return generator.generateStateTransfer(rootNode, + cache_loader_fetch_transient_state, + fetch_persistent_state, + suppressErrors); } finally { - rootNode.releaseAll(owner); + if (locked) + releaseStateTransferLocks(rootNode, owner); } } /** + * Returns the state for the portion of the tree named by <code>fqn</code>. + * <p> + * State returned is a serialized byte[][], element 0 is the transient state + * (or null), and element 1 is the persistent state (or null). + * + * @param fqn Fqn indicating the uppermost node in the + * portion of the tree whose state should be returned. + * @param timeout max number of ms this method should wait to acquire + * a read lock on the nodes being transferred + * @param force if a read lock cannot be acquired after + * <code>timeout</code> ms, should the lock acquisition + * be forced, and any existing transactions holding locks + * on the nodes be rolled back? <strong>NOTE:</strong> + * In release 1.2.4, this parameter has no effect. + * @param suppressErrors should any Throwable thrown be suppressed? + * + * @return a serialized byte[][], element 0 is the transient state + * (or null), and element 1 is the persistent state (or null). + * + * @throws UnsupportedOperationException if persistent state transfer is + * enabled, the requested Fqn is not the root node, and the + * cache loader does not implement {@link ExtendedCacheLoader}. + */ +// public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable { +// +// if (marshaller_ != null) { +// // can't give state for regions currently being activated/inactivated +// synchronized(activationChangeNodes) { +// if (activationChangeNodes.contains(fqn)) { +// if (log.isDebugEnabled()) +// log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated"); +// return null; +// } +// } +// +// // Can't give state for inactive nodes +// if (marshaller_.isInactive(fqn.toString())) { +// if (log.isDebugEnabled()) +// log.debug("ignoring _getState() for inactive region " + fqn); +// return null; +// } +// } +// +// DataNode rootNode = findNode(fqn); +// if (rootNode == null) +// return null; // Either we don't have data or the node is inactive +// +// boolean fetch_persistent_state=cache_loader != null && +// cache_loader_shared == false && +// cache_loader_fetch_persistent_state; +// +// boolean getRoot = rootNode.equals(root); +// if (fetch_persistent_state && (getRoot == false) && +// (cache_loader instanceof ExtendedCacheLoader) == false) +// { +// throw new UnsupportedOperationException( +// cache_loader.getClass().getName() + " does not implement " + +// "ExtendedClassLoader; partial state transfer not supported"); +// } +// +// byte[] transient_state=null; +// byte[] persistent_state=null; +// byte[][] states=new byte[3][]; +// byte[] retval=null; +// boolean locked=false; +// +// Object owner = getOwnerForLock(); +// +// states[0]=states[1]=states[2]=null; +// try { +// try { +// if(cache_loader_fetch_transient_state) { +// log.info("locking the " + fqn + " subtree to return the in-memory (transient) state"); +// try { +// +// rootNode.acquireAll(owner, timeout, DataNode.LOCK_TYPE_READ); +// } +// catch (TimeoutException te) { +// if (force) { +// // TODO JBCACHE-315 add force logic +// } +// else { +// throw te; +// } +// } +// locked = true; +// ByteArrayOutputStream baos = new ByteArrayOutputStream(); +// MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos); +// marshallTransientState(rootNode, out); +// out.close(); +// transient_state = baos.toByteArray(); +// states[0]=transient_state; +// log.info("returning the in-memory state (" + transient_state.length + " bytes)"); +// // Return any state associated with the subtree but not stored in it +// states[1]= _getAssociatedState(fqn, timeout, force); +// } +// } +// catch(Throwable t) { +// log.error("failed getting the in-memory (transient) state", t); +// if (!suppressErrors) +// throw t; +// } +// try { +// if(fetch_persistent_state) { +// if(!locked) { +// log.info("locking the tree to obtain persistent state"); +// try { +// rootNode.acquireAll(owner, state_fetch_timeout, DataNode.LOCK_TYPE_READ); +// } +// catch (TimeoutException te) { +// if (force) { +// // TODO JBCACHE-315 add force logic +// } +// else { +// throw te; +// } +// } +// } +// locked = true; +// log.info("getting the persistent state"); +// if (getRoot) +// persistent_state=cache_loader.loadEntireState(); +// else +// persistent_state=((ExtendedCacheLoader)cache_loader).loadState(fqn); +// +// states[2]=persistent_state; +// log.info("returning the persistent state (" + persistent_state.length + " bytes)"); +// } +// } +// catch(Throwable t) { +// log.error("failed getting the persistent state", t); +// if (!suppressErrors) +// throw t; +// } +// +// try { +// ByteArrayOutputStream baos = new ByteArrayOutputStream(); +// MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos); +// out.writeShort(getStateTransferVersion()); +// out.writeObject(states); +// out.close(); +// retval = baos.toByteArray(); +// return retval; +// } +// catch(Throwable t) { +// log.error("failed serializing transient and persistent state", t); +// if (!suppressErrors) +// throw t; +// return retval; +// } +// } +// finally { +// rootNode.releaseAll(owner); +// } +// } + + /** * Do a preorder traversal: visit the node first, then the node's children * @param fqn Start node * @param out * @throws Exception */ - private void marshallTransientState(DataNode node, - ObjectOutputStream out) throws Exception - { - Map attrs; - NodeData nd; - - // first handle the current node - attrs=node.getData(); - if(attrs == null || attrs.size() == 0) - nd=new NodeData(node.getFqn()); - else - nd=new NodeData(node.getFqn(), attrs); - out.writeObject(nd); - - // then visit the children - Map children = node.getChildren(); - if(children == null) - return; - for(Iterator it=children.entrySet().iterator(); it.hasNext();) { - Map.Entry entry = (Map.Entry) it.next(); - marshallTransientState((DataNode) entry.getValue(), out); - } - } +// private void marshallTransientState(DataNode node, +// ObjectOutputStream out) throws Exception +// { +// Map attrs; +// NodeData nd; +// +// // first handle the current node +// attrs=node.getData(); +// if(attrs == null || attrs.size() == 0) +// nd=new NodeData(node.getFqn()); +// else +// nd=new NodeData(node.getFqn(), attrs); +// out.writeObject(nd); +// +// // then visit the children +// Map children = node.getChildren(); +// if(children == null) +// return; +// for(Iterator it=children.entrySet().iterator(); it.hasNext();) { +// Map.Entry entry = (Map.Entry) it.next(); +// marshallTransientState((DataNode) entry.getValue(), out); +// } +// } /** * Returns any state stored in the cache that needs to be propagated @@ -2471,39 +2595,24 @@ * </p> * * @param fqn the fqn that represents the root node of the subtree. + * @param timeout max number of ms this method should wait to acquire + * a read lock on the nodes being transferred + * @param force if a read lock cannot be acquired after + * <code>timeout</code> ms, should the lock acquisition + * be forced, and any existing transactions holding locks + * on the nodes be rolled back? <strong>NOTE:</strong> + * In release 1.2.4, this parameter has no effect. * * @return a byte[] representing the marshalled form of any "associated" state, * or <code>null</code>. This implementation returns <code>null</code>. */ - protected byte[] _getAssociatedState(Fqn fqn) throws Exception + protected byte[] _getAssociatedState(Fqn fqn, long timeout, boolean force) throws Exception { // default implemenation does nothing return null; } /** - * Integrates into the tree any state that is "associated" with a subtree - * that was returned as part of a state transfer call. Called by - * {@link #_setState()} if - * {@link #getCacheLoaderFetchTransientState()} returns <code>true</code>. - * <p> - * This method is designed for overriding by - * {@link org.jboss.cache.aop.TreeCacheAop}. - * The implementation in this class does nothing. - * </p> - * - * @param fqn the fqn that represents the root node of the subtree with - * which the "global" state is associated. - * @param state the state. Cannot be <code>null</code>. - * - * @see #_getAssociatedState - */ - protected void _setAssociatedState(Fqn fqn, byte[] state) throws Exception - { - // default impl does nothing - } - - /** * Set the portion of the cache rooted in <code>targetRoot</code> * to match the given state. Updates the contents of <code>targetRoot</code> * to reflect those in <code>new_state</code>. @@ -2517,190 +2626,82 @@ * persistent state (or null) */ private void _setState(byte[] new_state, DataNode targetRoot, ClassLoader cl) + throws Exception { - boolean setRoot = (targetRoot == root); - Fqn targetRootFqn = targetRoot.getFqn(); - byte[][] states=null; - byte[] transient_state=null; - byte[] associated_state=null; - byte[] persistent_state=null; - boolean transientSet = false; - if(new_state == null) { log.info("new_state is null (may be first member in cluster)"); return; } - // 1. Unserialize the states into transient and persistent state - try { log.info("received the state (size=" + new_state.length + " bytes)"); - states = (byte[][]) MarshallUtil.objectFromByteBuffer(new_state); - transient_state=states[0]; - associated_state=states[1]; - persistent_state=states[2]; - if(transient_state != null) - log.info("transient state: " + transient_state.length + " bytes"); - if(persistent_state != null) - log.info("persistent state: " + persistent_state.length + " bytes"); - } - catch(Throwable t) { - log.error("failed unserializing state", t); - } - - // 2. If transient state is available: set it - if(transient_state != null) { - ClassLoader oldCL = null; - try { - if (cl != null) { - oldCL = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - } - log.info("setting transient state for " + targetRootFqn); - - integrateStateTransfer(targetRoot, transient_state); + Object owner = getOwnerForLock(); + try + { + // Acquire a lock on the root node + acquireLocksForStateTransfer(targetRoot, owner, state_fetch_timeout, true); - transientSet = true; + // 1. Unserialize the states into transient and persistent state + StateTransferIntegrator integrator = + StateTransferFactory.getStateTransferIntegrator(new_state, + targetRoot.getFqn(), + this); - log.info("transient state successfully set for " + targetRootFqn); + // 2. If transient state is available, integrate it + try { + integrator.integrateTransientState(targetRoot, cl); notifyAllNodesCreated(targetRoot); } catch(Throwable t) { log.error("failed setting transient state", t); } - finally { - if (oldCL != null) - Thread.currentThread().setContextClassLoader(oldCL); - } - } - if (!transientSet) { - // Clear any existing state from the targetRoot - targetRoot.clear(); - targetRoot.removeAllChildren(); + // 3. Store any persistent state + integrator.integratePersistentState(); } - else { - // 3. Set the associated state. We only do this if the normal - // transient state was set. - if (associated_state != null) - try - { - _setAssociatedState(targetRootFqn, associated_state); - } - catch (Throwable t) + finally { - log.error("failed setting associated state", t); + releaseStateTransferLocks(targetRoot, owner); } + } - // 4. Set the persistent state - if(persistent_state != null) { - if(cache_loader == null) { - log.error("cache loader is null, cannot set persistent state"); + public short getStateTransferVersion() + { + // TODO make this configurable once there is more than one possible value + return DEFAULT_STATE_TRANSFER_VERSION; } - else if (setRoot){ + + protected void acquireLocksForStateTransfer(DataNode root, + Object lockOwner, + long timeout, + boolean force) + throws Exception + { try { - log.info("setting the persistent state"); - // cache_loader.remove(Fqn.fromString("/")); - cache_loader.storeEntireState(persistent_state); - log.info("setting the persistent state was successful"); - } - catch(Throwable t) { - log.error("failed setting persistent state", t); - } - } - else if (cache_loader instanceof ExtendedCacheLoader) { - try { - log.info("setting the persistent state"); - // cache_loader.remove(Fqn.fromString("/")); - ((ExtendedCacheLoader)cache_loader).storeState(persistent_state, - targetRootFqn); - log.info("setting the persistent state was successful"); - } - catch(Throwable t) { - log.error("failed setting persistent state", t); + root.acquireAll(lockOwner, timeout, DataNode.LOCK_TYPE_READ); } + catch (TimeoutException te) { + log.error("Caught TimeoutException acquiring locks on " + + root.getFqn(), te); + if (force) { + // TODO JBCACHE-315 replace throwing te with force logic + throw te; } else { - log.error("cache loader does not implement ExtendedCacheLoader, " + - "cannot set persistent state"); + throw te; } } } - private void integrateStateTransfer(DataNode target, byte[] transient_state) - throws IOException, ClassNotFoundException - { - target.removeAllChildren(); - - ByteArrayInputStream in_stream=new ByteArrayInputStream(transient_state); - MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream); - - // Read the first NodeData and integrate into our target - NodeData nd = (NodeData) in.readObject(); - integrateNodeData(target, nd); - - // Build the child tree - NodeFactory factory = NodeFactory.getInstance(); - byte nodeType = isNodeLockingOptimistic() - ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE - : NodeFactory.NODE_TYPE_TREENODE; - integrateStateTransferChildren(target, in, factory, nodeType); - - in.close(); - } - - private NodeData integrateStateTransferChildren(DataNode parent, - ObjectInputStream in, - NodeFactory factory, - byte nodeType) - throws IOException, ClassNotFoundException - { - int parent_level = parent.getFqn().size(); - int target_level = parent_level + 1; - try + protected void releaseStateTransferLocks(DataNode root, Object lockOwner) { - NodeData nd = (NodeData) in.readObject(); - while (nd != null) { - Fqn fqn = nd.getFqn(); - int size = fqn.size(); - if (size <= parent_level) - return nd; - else if (size > target_level) - throw new IllegalStateException("NodeData " + fqn + - " is not a direct child of " + - parent.getFqn()); - - // We handle this NodeData. Create a DataNode and - // integrate its data - DataNode target = factory.createDataNode(nodeType, - fqn.get(size -1), - fqn, - parent, - null, - this); - integrateNodeData(target, nd); - parent.addChild(target.getName(), target); - - // Recursively call, which will walk down the tree - // and return the next NodeData that's a child of our parent - nd = integrateStateTransferChildren(target, in, factory, nodeType); - } - } - catch (EOFException eof) { - // all done + try { + root.releaseAll(lockOwner); } - - return null; + catch(Throwable t) { + log.error("failed releasing locks", t); } - - private void integrateNodeData(DataNode target, NodeData nd) - { - Map attrs = nd.getAttributes(); - if (attrs != null) - target.put(attrs, true); - else - target.clear(); } /** @@ -4096,9 +4097,13 @@ */ public byte[] getState() { try { - return cache._getState(Fqn.fromString(SEPARATOR), true); + return cache._getState(Fqn.fromString(SEPARATOR), + cache.getInitialStateRetrievalTimeout(), + true, true); } catch (Throwable t) { + // This shouldn't happen as we set "suppressErrors" to true, + // but we have to cache the Throwable declared in the method sig log.error("Caught " + t.getClass().getName() + " while responding to initial state transfer request;" + " returning null"); @@ -4109,7 +4114,16 @@ public void setState(byte[] new_state) { try { - _setState(new_state); + + if(new_state == null) + my_log.info("new cache is null (may be first member in cluster)"); + else + cache._setState(new_state, root, null); + + isStateSet = true; + } + catch(Throwable t) { + my_log.error("failed setting state", t); } finally { synchronized(stateLock) { @@ -4121,41 +4135,6 @@ } } - /** - * Set the cache (tree) to this value. The new_state is a byt[][] array, element 0 is the transient state - * (or null) , and element 1 is the persistent state (or null) - */ - void _setState(byte[] new_state) { - - if(new_state == null) { - my_log.info("new cache is null (maybe first member in cluster)"); - return; - } - - DataNode oldRoot = null; - try - { - // Acquire a lock on the root node - Object owner = getOwnerForLock(); - root.acquireAll(owner, state_fetch_timeout, DataNode.LOCK_TYPE_WRITE); - oldRoot = root; - - cache._setState(new_state, root, null); - } - catch(Throwable t) { - my_log.error("failed setting transient state", t); - } - finally - { - if (oldRoot != null) { - my_log.info("forcing release of all locks in old tree"); - try {oldRoot.releaseAllForce();} catch(Throwable t) {log.error("failed releasing locks", t);} - } - } - - isStateSet=true; - } - } @@ -4373,7 +4352,7 @@ } - protected final RegionManager getRegionManager() + private RegionManager getRegionManager() { if (regionManager_ == null && useMarshalling_) regionManager_ = new RegionManager(); |