User: vlada
Date: 06/11/20 14:22:28
Modified: tests/junit/org/jgroups/tests
StreamingStateTransferTest.java
ChannelTestBase.java FlushTest.java
ConcurrentStartupTest.java
Log:
small updates
Revision Changes Path
1.13 +117 -36 JGroups/tests/junit/org/jgroups/tests/StreamingStateTransferTest.java
Index: StreamingStateTransferTest.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/tests/junit/org/jgroups/tests/StreamingStateTransferTest.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- StreamingStateTransferTest.java 17 Nov 2006 17:51:20 -0000 1.12
+++ StreamingStateTransferTest.java 20 Nov 2006 22:22:28 -0000 1.13
@@ -5,13 +5,19 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannelFactory;
+import org.jgroups.Message;
import org.jgroups.util.Util;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
@@ -31,33 +37,47 @@
public void setUp() throws Exception
{
super.setUp();
- CHANNEL_CONFIG = System.getProperty("channel.config.streaming", "conf/flush-udp.xml");
- }
+ CHANNEL_CONFIG = System.getProperty("channel.config.streaming", "flush-udp.xml");
+ }
- public void testStreamingStateTransfer()
+ public void testTransfer()
{
- testTransfer(false);
+ String channelNames [] = null;
+ //mux applications on top of same channel have to have unique name
+ if(isMuxChannelUsed())
+ {
+ channelNames = createMuxApplicationNames(1);
+ }
+ else
+ {
+ channelNames = new String[]{"A", "B", "C", "D"};
+ }
+ transferHelper(channelNames,false);
}
- public void testRpcStreamingStateTransfer()
+ public void testRpcChannelTransfer()
{
- testTransfer(true);
+ //do this test for regular channels only
+ if(!isMuxChannelUsed())
+ {
+ String channelNames []= new String[]{"A", "B", "C", "D"};
+ transferHelper(channelNames,true);
+ }
}
-
- public void testTransfer(boolean useDispatcher)
+
+ public void testMultipleServiceMuxChannel()
{
- String[] channelNames = null;
-
- //for mux all names are used as app ids and need to be the same
+ String channelNames [] = null;
+ //mux applications on top of same channel have to have unique name
if(isMuxChannelUsed())
{
- channelNames = new String[]{"A", "A", "A", "A"};
- }
- else
- {
- channelNames = new String[]{"A", "B", "C", "D"};
- }
-
+ channelNames = createMuxApplicationNames(2);
+ transferHelper(channelNames,false);
+ }
+ }
+
+ public void transferHelper(String channelNames[], boolean useDispatcher)
+ {
int channelCount = channelNames.length;
StreamingStateTransferApplication[] channels = null;
@@ -77,7 +97,7 @@
if(isMuxChannelUsed())
{
- channels[i] = new StreamingStateTransferApplication(channelNames[i],muxFactory[i],semaphore);
+ channels[i] = new StreamingStateTransferApplication(channelNames[i],muxFactory[i%getMuxFactoryCount()],semaphore);
}
else
{
@@ -85,23 +105,31 @@
}
// Start threads and let them join the channel
- channels[i].start();
- semaphore.release(1);
+ channels[i].start();
sleepThread(2000);
+ semaphore.release(1);
+ }
+
+ if(isMuxChannelUsed())
+ {
+ blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
}
-
- // Make sure everyone is in sync
- blockUntilViewsReceived(channels, 60000);
+ else
+ {
+ blockUntilViewsReceived(channels, 60000);
+ }
+
//Reacquire the semaphore tickets; when we have them all
// we know the threads are done
- acquireSemaphore(semaphore, 60000, channelCount);
+ acquireSemaphore(semaphore, 60000, channelCount);
int getStateInvokedCount = 0;
int setStateInvokedCount = 0;
int partialGetStateInvokedCount = 0;
int partialSetStateInvokedCount = 0;
+ sleepThread(3000);
for (int i = 0; i < channels.length; i++)
{
if(channels[i].getStateInvoked)
@@ -119,13 +147,30 @@
if(channels[i].partialSetStateInvoked)
{
partialSetStateInvokedCount++;
- }
+ }
+ Map map = channels[i].getMap();
+ for (int j = 0; j < channels.length; j++)
+ {
+ List l = (List) map.get(channels[j].getLocalAddress());
+ int size = l!=null?l.size():0;
+ assertEquals("Correct element count in map ",StreamingStateTransferApplication.COUNT,size);
+ }
+ }
+ if(isMuxChannelUsed())
+ {
+ int factor = channelCount/getMuxFactoryCount();
+ assertEquals("Correct invocation count of getState ",1*factor, getStateInvokedCount);
+ assertEquals("Correct invocation count of setState ",(channelCount/factor)-1,setStateInvokedCount/factor);
+ assertEquals("Correct invocation count of partial getState ",1*factor, partialGetStateInvokedCount);
+ assertEquals("Correct invocation count of partial setState ",(channelCount/factor)-1,partialSetStateInvokedCount/factor);
+ }
+ else
+ {
+ assertEquals("Correct invocation count of getState ",1, getStateInvokedCount);
+ assertEquals("Correct invocation count of setState ",channelCount-1,setStateInvokedCount);
+ assertEquals("Correct invocation count of partial getState ",1, partialGetStateInvokedCount);
+ assertEquals("Correct invocation count of partial setState ",channelCount-1,partialSetStateInvokedCount);
}
-
- assertEquals("Correct invocation count of getState ",1, getStateInvokedCount);
- assertEquals("Correct invocation count of setState ",channelCount-1,setStateInvokedCount);
- assertEquals("Correct invocation count of partial getState ",1, partialGetStateInvokedCount);
- assertEquals("Correct invocation count of partial setState ",channelCount-1,partialSetStateInvokedCount);
}
catch (Exception ex)
@@ -152,7 +197,9 @@
protected class StreamingStateTransferApplication extends PushChannelApplicationWithSemaphore
{
- private Object transferObject = new String("JGroups");
+ private Map stateMap = new HashMap();
+
+ public static final int COUNT = 25;
private Object partialTransferObject = new String("partial");
@@ -174,10 +221,34 @@
super(name,factory,s);
}
+ public void receive(Message msg)
+ {
+ Address sender = msg.getSrc();
+ synchronized(stateMap)
+ {
+ List list = (List) stateMap.get(sender);
+ if(list == null)
+ {
+ list = new ArrayList();
+ stateMap.put(sender, list);
+ }
+ list.add(msg.getObject());
+ }
+ }
+
+ public Map getMap()
+ {
+ return stateMap;
+ }
+
public void useChannel() throws Exception
{
channel.connect("test");
- channel.getState(null, 25000);
+ for(int i = 0;i < COUNT;i++)
+ {
+ channel.send(null,null,new Integer(i));
+ }
+ channel.getState(null, 25000);
channel.getState(null, name, 25000);
}
@@ -188,7 +259,12 @@
try
{
oos = new ObjectOutputStream(ostream);
- oos.writeObject(transferObject);
+ HashMap copy = null;
+ synchronized (stateMap)
+ {
+ copy = new HashMap(stateMap);
+ }
+ oos.writeObject(copy);
oos.flush();
}
catch (IOException e)
@@ -208,8 +284,13 @@
ObjectInputStream ois = null;
try
{
- ois = new ObjectInputStream(istream);
- TestCase.assertEquals("Got full state requested ", transferObject,ois.readObject());
+ ois = new ObjectInputStream(istream);
+ Map map = (Map) ois.readObject();
+ synchronized (stateMap)
+ {
+ stateMap.clear();
+ stateMap.putAll(map);
+ }
}
catch (Exception e)
{
1.2 +113 -20 JGroups/tests/junit/org/jgroups/tests/ChannelTestBase.java
Index: ChannelTestBase.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/tests/junit/org/jgroups/tests/ChannelTestBase.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ChannelTestBase.java 16 Nov 2006 18:31:20 -0000 1.1
+++ ChannelTestBase.java 20 Nov 2006 22:22:28 -0000 1.2
@@ -38,6 +38,8 @@
public class ChannelTestBase extends TestCase
{
private static Random random = new Random();
+
+ private static String DEFAULT_MUX_FACTORY_COUNT = "4";
static String CHANNEL_CONFIG = "udp.xml";
@@ -58,7 +60,7 @@
super.setUp();
if (isMuxChannelUsed())
{
- int factoryCount = Integer.parseInt(System.getProperty("mux.factorycount", "4"));
+ int factoryCount = Integer.parseInt(System.getProperty("mux.factorycount", DEFAULT_MUX_FACTORY_COUNT));
muxFactory = new JChannelFactory[factoryCount];
for (int i = 0; i < muxFactory.length; i++)
@@ -107,14 +109,37 @@
}
}
- protected boolean isMuxChannelUsed()
- {
- return Boolean.valueOf(System.getProperty("mux.on", "false")).booleanValue();
- }
-
- protected boolean shouldCompareThreadCount()
- {
- return Boolean.valueOf(System.getProperty("threadcount", "false")).booleanValue();
+ /**
+ * Returns an array of mux application/service names with a guarantee that:
+ * <p>
+ * - there are no application/service name collissions on top of one channel
+ * (i.e cannot have two application/service(s) with the same name on top of one channel)
+ * <p>
+ * - each generated application/service name is guaranteed to have a corresponding
+ * pair application/service with the same name on another channel
+ *
+ * @param muxApplicationstPerChannelCount
+ * @return
+ */
+ protected String [] createMuxApplicationNames(int muxApplicationstPerChannelCount)
+ {
+ int channelCount = getMuxFactoryCount();
+ int start = 64; //start with letter A
+ String names [] = null;
+ int appCount = channelCount * muxApplicationstPerChannelCount;
+ names = new String[appCount];
+
+ boolean chooseNext = false;
+ for (int i = 0; i < appCount; i++)
+ {
+ chooseNext = (i%channelCount == 0)?true:false;
+ if(chooseNext)
+ {
+ start++;
+ }
+ names[i] = Character.toString((char)start);
+ }
+ return names;
}
/**
@@ -125,7 +150,7 @@
{
public Channel createChannel(Object id) throws Exception
{
- return createChannel(CHANNEL_CONFIG, true);
+ return createChannel(CHANNEL_CONFIG, useBlocking());
}
protected Channel createChannel(String configFile, boolean useBlocking) throws Exception
@@ -165,10 +190,22 @@
public Channel createChannel(Object id) throws Exception
{
- log.info("Factory " + f.toString() + " creating mux channel using "
- + MUX_CHANNEL_CONFIG + " and stack "
- + MUX_CHANNEL_CONFIG_STACK_NAME);
- return f.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, id.toString());
+ Channel c = f.createMultiplexerChannel(MUX_CHANNEL_CONFIG_STACK_NAME, id.toString());
+ if(useBlocking())
+ {
+ c.setOpt(Channel.BLOCK, Boolean.TRUE);
+ }
+ Address address = c.getLocalAddress();
+ String append = "[" + id + "]" + " using " + MUX_CHANNEL_CONFIG + ",stack " + MUX_CHANNEL_CONFIG_STACK_NAME;
+ if (address == null)
+ {
+ log.info("Created unconnected mux channel " + append);
+ }
+ else
+ {
+ log.info("Created mux channel "+ address + append);
+ }
+ return c;
}
}
/**
@@ -495,15 +532,57 @@
public Address getLocalAddress();
}
+
+ /**
+ * Returns true if JVM has been started with mux.on system property
+ * set to true, false otherwise.
+ *
+ * @return
+ */
+ protected static boolean isMuxChannelUsed()
+ {
+ return Boolean.valueOf(System.getProperty("mux.on", "false")).booleanValue();
+ }
+
+ /**
+ * Returns true if JVM has been started with threadcount system property
+ * set to true, false otherwise.
+ *
+ * @return
+ */
+ protected static boolean shouldCompareThreadCount()
+ {
+ return Boolean.valueOf(System.getProperty("threadcount", "false")).booleanValue();
+ }
+
+ /**
+ * Returns value of mux.factorycount system property has been set, otherwise returns
+ * DEFAULT_MUX_FACTORY_COUNT.
+ *
+ * @return
+ */
+ protected static int getMuxFactoryCount()
+ {
+ return Integer.parseInt(System.getProperty("mux.factorycount", DEFAULT_MUX_FACTORY_COUNT));
+ }
+
+ /**
+ * Returns true if JVM has been started with useBlocking system property
+ * set to true, false otherwise.
+ *
+ * @return
+ */
+ protected static boolean useBlocking()
+ {
+ return Boolean.valueOf(System.getProperty("useBlocking", "true")).booleanValue();
+ }
/**
* Checks each channel in the parameter array to see if it has the
* exact same view as other channels in an array.
*/
- public static boolean areViewsComplete(MemberRetrievable[] channels)
- {
- int memberCount = channels.length;
-
+ public static boolean areViewsComplete(MemberRetrievable[] channels,int memberCount)
+ {
for (int i = 0; i < memberCount; i++)
{
if (!isViewComplete(channels[i], memberCount))
@@ -524,14 +603,28 @@
* @throws RuntimeException if <code>timeout</code> ms have elapse without
* all channels having the same number of members.
*/
- public static void blockUntilViewsReceived(MemberRetrievable[] channels, long timeout)
+ public static void blockUntilViewsReceived(MemberRetrievable[] channels,long timeout)
+ {
+ blockUntilViewsReceived(channels,channels.length,timeout);
+ }
+
+ /**
+ * Loops, continually calling {@link #areViewsComplete(MemberRetrievable[])}
+ * until it either returns true or <code>timeout</code> ms have elapsed.
+ *
+ * @param channels channels which must all have consistent views
+ * @param timeout max number of ms to loop
+ * @throws RuntimeException if <code>timeout</code> ms have elapse without
+ * all channels having the same number of members.
+ */
+ public static void blockUntilViewsReceived(MemberRetrievable[] channels, int count, long timeout)
{
long failTime = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < failTime)
{
sleepThread(100);
- if (areViewsComplete(channels))
+ if (areViewsComplete(channels,count))
{
return;
}
1.19 +46 -18 JGroups/tests/junit/org/jgroups/tests/FlushTest.java
Index: FlushTest.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/tests/junit/org/jgroups/tests/FlushTest.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- FlushTest.java 17 Nov 2006 22:26:24 -0000 1.18
+++ FlushTest.java 20 Nov 2006 22:22:28 -0000 1.19
@@ -34,7 +34,7 @@
/**
* Tests the FLUSH protocol, requires flush-udp.xml in ./conf to be present and configured to use FLUSH
* @author Bela Ban
- * @version $Id: FlushTest.java,v 1.18 2006/11/17 22:26:24 vlada Exp $
+ * @version $Id: FlushTest.java,v 1.19 2006/11/20 22:22:28 vlada Exp $
*/
public class FlushTest extends ChannelTestBase
{
@@ -156,26 +156,44 @@
public void testChannelAfterConnect()
{
- testChannels(false);
+ String[] names = null;
+ if(isMuxChannelUsed())
+ {
+ names = createMuxApplicationNames(1);
+ }
+ else
+ {
+ names = new String[]{"A", "B", "C", "D"};
+ }
+ testChannels(names,false);
}
public void testChannelsWithStateTransfer()
{
- testChannels(true);
- }
-
- public void testChannels(boolean useTransfer)
- {
String[] names = null;
if(isMuxChannelUsed())
{
- names = new String[]{"A", "A", "A", "A"};
+ names = createMuxApplicationNames(1);
}
else
{
names = new String[]{"A", "B", "C", "D"};
}
-
+ testChannels(names,true);
+ }
+
+ public void testMultipleServiceMuxChannelWithStateTransfer()
+ {
+ String[] names = null;
+ if(isMuxChannelUsed())
+ {
+ names = createMuxApplicationNames(2);
+ testChannels(names,true);
+ }
+ }
+
+ public void testChannels(String names[], boolean useTransfer)
+ {
int count = names.length;
MyReceiver[] channels = new MyReceiver[count];
@@ -190,7 +208,7 @@
{
if(isMuxChannelUsed())
{
- channels[i] = new MyReceiver(names[i],muxFactory[i], semaphore, useTransfer);
+ channels[i] = new MyReceiver(names[i],muxFactory[i%getMuxFactoryCount()], semaphore, useTransfer);
}
else
{
@@ -205,11 +223,19 @@
{
semaphore.release(1);
}
- sleepRandom(1500);
+ sleepThread(2000);
}
- // Make sure everyone is in sync
- blockUntilViewsReceived(channels, 60000);
+
+ if(isMuxChannelUsed())
+ {
+ blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
+ }
+ else
+ {
+ blockUntilViewsReceived(channels, 60000);
+ }
+
//if state transfer is used release all at once
//clear all channels of view events
@@ -230,13 +256,14 @@
acquireSemaphore(semaphore, 60000, count);
//Sleep to ensure async message arrive
- sleepThread(2000);
+ sleepThread(3000);
for (int i = 0; i < count; i++)
{
MyReceiver receiver = channels[i];
+ log.info("Events for " + channels[i].getLocalAddress()+channels[i].getName() + " are " + channels[i].getEvents());
if (useTransfer)
- {
+ {
checkEventStateTransferSequence(receiver);
}
else
@@ -310,15 +337,16 @@
{
Object event = events.get(i);
if (event instanceof BlockEvent)
- {
- Object o = events.get(i + 1);
+ {
if (i + 1 < size)
{
+ Object o = events.get(i + 1);
assertTrue("After Block should be state or unblock " + eventString, o instanceof SetStateEvent
|| o instanceof GetStateEvent || o instanceof UnblockEvent);
}
- if (i != 0)
+ else if (i != 0)
{
+ Object o = events.get(i + 1);
assertTrue("Before Block should be state or Unblock " + eventString, o instanceof SetStateEvent
|| o instanceof GetStateEvent || o instanceof UnblockEvent);
}
1.18 +25 -11 JGroups/tests/junit/org/jgroups/tests/ConcurrentStartupTest.java
Index: ConcurrentStartupTest.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/tests/junit/org/jgroups/tests/ConcurrentStartupTest.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- ConcurrentStartupTest.java 16 Nov 2006 18:31:20 -0000 1.17
+++ ConcurrentStartupTest.java 20 Nov 2006 22:22:28 -0000 1.18
@@ -24,7 +24,7 @@
/**
* Tests concurrent startup with state transfer and concurrent state tranfer.
* @author bela
- * @version $Id: ConcurrentStartupTest.java,v 1.17 2006/11/16 18:31:20 vlada Exp $
+ * @version $Id: ConcurrentStartupTest.java,v 1.18 2006/11/20 22:22:28 vlada Exp $
*/
public class ConcurrentStartupTest extends ChannelTestBase
{
@@ -64,10 +64,10 @@
{
String[] names = null;
- //for mux all names are used as app ids and need to be the same
+ //mux applications on top of same channel have to have unique name
if(isMuxChannelUsed())
{
- names = new String[]{"A", "A", "A", "A"};
+ names = createMuxApplicationNames(1);
}
else
{
@@ -90,7 +90,7 @@
{
if(isMuxChannelUsed())
{
- channels[i] = new ConcurrentStartupChannelWithLargeState(names[i],muxFactory[i],semaphore);
+ channels[i] = new ConcurrentStartupChannelWithLargeState(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
}
else
{
@@ -102,7 +102,7 @@
if(isMuxChannelUsed())
{
- channels[i] = new ConcurrentStartupChannel(names[i],muxFactory[i],semaphore);
+ channels[i] = new ConcurrentStartupChannel(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
}
else
{
@@ -117,7 +117,14 @@
}
// Make sure everyone is in sync
- blockUntilViewsReceived(channels, 60000);
+ if(isMuxChannelUsed())
+ {
+ blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
+ }
+ else
+ {
+ blockUntilViewsReceived(channels, 60000);
+ }
// Sleep to ensure the threads get all the semaphore tickets
sleepThread(1000);
@@ -190,10 +197,10 @@
{
String[] names = null;
- //for mux all names are used as app ids and need to be the same
+ //mux applications on top of same channel have to have unique name
if(isMuxChannelUsed())
{
- names = new String[]{"A", "A", "A", "A"};
+ names = createMuxApplicationNames(1);
}
else
{
@@ -217,7 +224,7 @@
{
if(isMuxChannelUsed())
{
- channels[i] = new ConcurrentLargeStateTransfer(names[i],muxFactory[i],semaphore);
+ channels[i] = new ConcurrentLargeStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
}
else
{
@@ -228,7 +235,7 @@
{
if(isMuxChannelUsed())
{
- channels[i] = new ConcurrentStateTransfer(names[i],muxFactory[i],semaphore);
+ channels[i] = new ConcurrentStateTransfer(names[i],muxFactory[i%getMuxFactoryCount()],semaphore);
}
else
{
@@ -242,7 +249,14 @@
}
// Make sure everyone is in sync
- //blockUntilViewsReceived(channels, 60000);
+ if(isMuxChannelUsed())
+ {
+ blockUntilViewsReceived(channels,getMuxFactoryCount(), 60000);
+ }
+ else
+ {
+ blockUntilViewsReceived(channels, 60000);
+ }
sleepThread(2000);
//Unleash hell !
|