From: Timothy F. <ti...@jb...> - 2006-03-05 16:17:28
|
User: timfox Date: 06/03/05 11:17:26 Modified: tests/src/org/jboss/test/messaging/core/plugin JDBCPersistenceManagerTest.java MessageStoreWithPersistenceManagerTest.java Log: JBMESSAGING-237 Paging channels (Lazy loading queues) Revision Changes Path 1.8 +984 -128 jboss-jms/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java (In the diff below, changes in quantity of whitespace are not shown.) Index: JDBCPersistenceManagerTest.java =================================================================== RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -b -r1.7 -r1.8 --- JDBCPersistenceManagerTest.java 4 Mar 2006 02:43:57 -0000 1.7 +++ JDBCPersistenceManagerTest.java 5 Mar 2006 16:17:26 -0000 1.8 @@ -22,27 +22,36 @@ package org.jboss.test.messaging.core.plugin; import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import javax.naming.InitialContext; +import javax.sql.DataSource; +import javax.transaction.TransactionManager; import javax.transaction.xa.Xid; import org.jboss.messaging.core.Channel; import org.jboss.messaging.core.Message; import org.jboss.messaging.core.MessageReference; +import org.jboss.messaging.core.message.MessageFactory; import org.jboss.messaging.core.plugin.JDBCPersistenceManager; import org.jboss.messaging.core.plugin.PagingMessageStore; import org.jboss.messaging.core.plugin.contract.MessageStore; +import org.jboss.messaging.core.plugin.contract.PersistenceManager; import org.jboss.messaging.core.tx.Transaction; import org.jboss.messaging.core.tx.TransactionRepository; import org.jboss.test.messaging.MessagingTestCase; import org.jboss.test.messaging.core.SimpleChannel; import org.jboss.test.messaging.tools.ServerManagement; import org.jboss.test.messaging.tools.jmx.ServiceContainer; -import org.jboss.messaging.core.message.MessageFactory; +import org.jboss.tm.TransactionManagerService; import org.jboss.util.id.GUID; @@ -58,7 +67,7 @@ protected ServiceContainer sc; - protected JDBCPersistenceManager tl; + protected JDBCPersistenceManager pm; protected MessageStore ms; @@ -86,23 +95,22 @@ protected void doSetup(boolean guid, boolean batch) throws Exception { - tl = createPM(); + pm = createPM(); if (guid) { - tl.setSqlProperties(this.getConfigTablesForGUID()); + pm.setSqlProperties(this.getConfigTablesForGUID()); } - tl.setTxIdGuid(guid); - tl.setUsingBatchUpdates(batch); + pm.setTxIdGuid(guid); + pm.setUsingBatchUpdates(batch); - ms = new PagingMessageStore("s0", tl); + ms = new PagingMessageStore("s0"); - tl.start(); + pm.start(); } protected JDBCPersistenceManager createPM() throws Exception { - log.info(this + " creatpm"); return new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager()); } @@ -113,118 +121,848 @@ sc.stop(); sc = null; } - tl.stop(); + pm.stop(); super.tearDown(); } - public void testAddReference() throws Exception + public void testGetMaxOrdering() throws Exception { doSetup(false, false); Channel channel = new SimpleChannel(0, ms); - Message[] messages = createMessages(); + Message[] m = createMessages(10); - for (int i = 0; i < messages.length; i++) - { - Message m = messages[i]; + MessageReference ref1 = ms.reference(m[0]); + ref1.setOrdering(1); + MessageReference ref2 = ms.reference(m[1]); + ref1.setOrdering(3); + MessageReference ref3 = ms.reference(m[2]); + ref1.setOrdering(6); + MessageReference ref4 = ms.reference(m[3]); + ref1.setOrdering(13); + MessageReference ref5 = ms.reference(m[4]); + ref1.setOrdering(15); + MessageReference ref6 = ms.reference(m[5]); + ref1.setOrdering(8); + MessageReference ref7 = ms.reference(m[6]); + ref1.setOrdering(23); + MessageReference ref8 = ms.reference(m[7]); + ref1.setOrdering(45); + MessageReference ref9 = ms.reference(m[8]); + ref1.setOrdering(10); + MessageReference ref10 = ms.reference(m[9]); + ref1.setOrdering(111); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + long maxOrdering = pm.getMaxOrdering(channel.getChannelID()); - MessageReference ref = ms.reference(m); + assertEquals(111, maxOrdering); - tl.addReference(channel.getChannelID(), ref, null); + pm.removeAllChannelData(channel.getChannelID()); - List refs = tl.messageRefs(channel.getChannelID()); + } - assertNotNull(refs); - assertEquals(1, refs.size()); - String messageID = (String)refs.get(0); + public void testGetNumberOfReferences() throws Exception + { + doSetup(false, false); - assertEquals(ref.getMessageID(), messageID); + Channel channel = new SimpleChannel(0, ms); - tl.removeAllMessageData(channel.getChannelID()); + Message[] m = createMessages(10); + + MessageReference ref1 = ms.reference(m[0]); + MessageReference ref2 = ms.reference(m[1]); + MessageReference ref3 = ms.reference(m[2]); + MessageReference ref4 = ms.reference(m[3]); + MessageReference ref5 = ms.reference(m[4]); + MessageReference ref6 = ms.reference(m[5]); + MessageReference ref7 = ms.reference(m[6]); + MessageReference ref8 = ms.reference(m[7]); + MessageReference ref9 = ms.reference(m[8]); + MessageReference ref10 = ms.reference(m[9]); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + int numberOfReferences = pm.getNumberOfReferences(channel.getChannelID()); + + assertEquals(10, numberOfReferences); + + pm.removeReference(channel.getChannelID(), ref1, null); + + numberOfReferences = pm.getNumberOfReferences(channel.getChannelID()); + + assertEquals(9, numberOfReferences); + + pm.removeAllChannelData(channel.getChannelID()); - refs = tl.messageRefs(channel.getChannelID()); - assertTrue(refs.isEmpty()); } + + public void testGetReferenceInfos() throws Exception + { + doSetup(false, false); + + Channel channel = new SimpleChannel(0, ms); + + Message[] m = createMessages(10); + + MessageReference ref1 = ms.reference(m[0]); + ref1.setOrdering(0); + MessageReference ref2 = ms.reference(m[1]); + ref2.setOrdering(1); + MessageReference ref3 = ms.reference(m[2]); + ref3.setOrdering(2); + MessageReference ref4 = ms.reference(m[3]); + ref4.setOrdering(11); + MessageReference ref5 = ms.reference(m[4]); + ref5.setOrdering(22); + MessageReference ref6 = ms.reference(m[5]); + ref6.setOrdering(100); + MessageReference ref7 = ms.reference(m[6]); + ref7.setOrdering(303); + MessageReference ref8 = ms.reference(m[7]); + ref8.setOrdering(1000); + MessageReference ref9 = ms.reference(m[8]); + ref9.setOrdering(1001); + MessageReference ref10 = ms.reference(m[9]); + ref10.setOrdering(1002); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + List refInfos = pm.getReferenceInfos(channel.getChannelID(), 0); + + assertNotNull(refInfos); + assertEquals(0, refInfos.size()); + + refInfos = pm.getReferenceInfos(channel.getChannelID(), 3); + assertNotNull(refInfos); + assertEquals(3, refInfos.size()); + assertEquals(ref1.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(0)).getMessageId()); + assertEquals(ref2.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(1)).getMessageId()); + assertEquals(ref3.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(2)).getMessageId()); + + refInfos = pm.getReferenceInfos(channel.getChannelID(), 10); + assertNotNull(refInfos); + assertEquals(10, refInfos.size()); + assertEquals(ref1.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(0)).getMessageId()); + assertEquals(ref2.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(1)).getMessageId()); + assertEquals(ref3.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(2)).getMessageId()); + assertEquals(ref4.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(3)).getMessageId()); + assertEquals(ref5.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(4)).getMessageId()); + assertEquals(ref6.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(5)).getMessageId()); + assertEquals(ref7.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(6)).getMessageId()); + assertEquals(ref8.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(7)).getMessageId()); + assertEquals(ref9.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(8)).getMessageId()); + assertEquals(ref10.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(9)).getMessageId()); + + refInfos = pm.getReferenceInfos(channel.getChannelID(), 19); + assertNotNull(refInfos); + assertEquals(10, refInfos.size()); + assertEquals(ref1.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(0)).getMessageId()); + assertEquals(ref2.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(1)).getMessageId()); + assertEquals(ref3.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(2)).getMessageId()); + assertEquals(ref4.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(3)).getMessageId()); + assertEquals(ref5.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(4)).getMessageId()); + assertEquals(ref6.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(5)).getMessageId()); + assertEquals(ref7.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(6)).getMessageId()); + assertEquals(ref8.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(7)).getMessageId()); + assertEquals(ref9.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(8)).getMessageId()); + assertEquals(ref10.getMessageID(), ((PersistenceManager.ReferenceInfo)refInfos.get(9)).getMessageId()); + + pm.removeAllChannelData(channel.getChannelID()); + } - public void testRemoveReference() throws Exception + public void testGetMessages() throws Exception { doSetup(false, false); Channel channel = new SimpleChannel(0, ms); - Message[] messages = createMessages(); + Message[] m = createMessages(10); - for (int i = 0; i < messages.length; i++) + MessageReference ref1 = ms.reference(m[0]); + MessageReference ref2 = ms.reference(m[1]); + MessageReference ref3 = ms.reference(m[2]); + MessageReference ref4 = ms.reference(m[3]); + MessageReference ref5 = ms.reference(m[4]); + MessageReference ref6 = ms.reference(m[5]); + MessageReference ref7 = ms.reference(m[6]); + MessageReference ref8 = ms.reference(m[7]); + MessageReference ref9 = ms.reference(m[8]); + MessageReference ref10 = ms.reference(m[9]); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + List msgIds = new ArrayList(); + msgIds.add(ref3.getMessageID()); + msgIds.add(ref4.getMessageID()); + msgIds.add(ref7.getMessageID()); + msgIds.add(ref9.getMessageID()); + msgIds.add(ref1.getMessageID()); + + List ms = pm.getMessages(msgIds); + assertNotNull(ms); + assertEquals(5, ms.size()); + + assertTrue(containsMessage(ms, ref3.getMessageID())); + assertTrue(containsMessage(ms, ref4.getMessageID())); + assertTrue(containsMessage(ms, ref7.getMessageID())); + assertTrue(containsMessage(ms, ref9.getMessageID())); + assertTrue(containsMessage(ms, ref1.getMessageID())); + + pm.removeAllChannelData(channel.getChannelID()); + } + + protected boolean containsMessage(List msgs, Serializable msgId) { - Message m = messages[i]; + Iterator iter = msgs.iterator(); + while (iter.hasNext()) + { + Message m = (Message)iter.next(); + if (m.getMessageID().equals(msgId)) + { + return true; + } + } + return false; + } - MessageReference ref = ms.reference(m); + public void testGetMessagesMaxParams() throws Exception + { + doSetup(false, false); - tl.addReference(channel.getChannelID(), ref, null); + pm.setMaxParams(5); - List refs = tl.messageRefs(channel.getChannelID()); + Channel channel = new SimpleChannel(0, ms); - assertNotNull(refs); - assertEquals(1, refs.size()); - String messageID = (String)refs.get(0); + Message[] m = createMessages(10); - assertEquals(ref.getMessageID(), messageID); + MessageReference ref1 = ms.reference(m[0]); + MessageReference ref2 = ms.reference(m[1]); + MessageReference ref3 = ms.reference(m[2]); + MessageReference ref4 = ms.reference(m[3]); + MessageReference ref5 = ms.reference(m[4]); + MessageReference ref6 = ms.reference(m[5]); + MessageReference ref7 = ms.reference(m[6]); + MessageReference ref8 = ms.reference(m[7]); + MessageReference ref9 = ms.reference(m[8]); + MessageReference ref10 = ms.reference(m[9]); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + List msgIds = new ArrayList(); + msgIds.add(ref3.getMessageID()); + msgIds.add(ref4.getMessageID()); + msgIds.add(ref7.getMessageID()); + msgIds.add(ref9.getMessageID()); + msgIds.add(ref1.getMessageID()); + + List ms = pm.getMessages(msgIds); + assertNotNull(ms); + assertEquals(5, ms.size()); + assertTrue(containsMessage(ms, ref3.getMessageID())); + assertTrue(containsMessage(ms, ref4.getMessageID())); + assertTrue(containsMessage(ms, ref7.getMessageID())); + assertTrue(containsMessage(ms, ref9.getMessageID())); + assertTrue(containsMessage(ms, ref1.getMessageID())); - tl.removeReference(channel.getChannelID(), ref, null); + pm.removeAllChannelData(channel.getChannelID()); + } - refs = tl.messageRefs(channel.getChannelID()); + public void testAddReferences() throws Exception + { + doSetup(false, false); - assertTrue(refs.isEmpty()); + Channel channel = new SimpleChannel(0, ms); - } + Message[] m = createMessages(10); + + MessageReference ref1 = ms.reference(m[0]); + MessageReference ref2 = ms.reference(m[1]); + MessageReference ref3 = ms.reference(m[2]); + MessageReference ref4 = ms.reference(m[3]); + MessageReference ref5 = ms.reference(m[4]); + MessageReference ref6 = ms.reference(m[5]); + MessageReference ref7 = ms.reference(m[6]); + MessageReference ref8 = ms.reference(m[7]); + MessageReference ref9 = ms.reference(m[8]); + MessageReference ref10 = ms.reference(m[9]); + + List refs = new ArrayList(); + refs.add(ref1); + refs.add(ref2); + refs.add(ref3); + refs.add(ref4); + refs.add(ref5); + refs.add(ref6); + refs.add(ref7); + refs.add(ref8); + refs.add(ref9); + refs.add(ref10); + + pm.addReferences(channel.getChannelID(), refs); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(10, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(10, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + + List msgIds = new ArrayList(); + msgIds.add(ref3.getMessageID()); + msgIds.add(ref4.getMessageID()); + msgIds.add(ref7.getMessageID()); + msgIds.add(ref9.getMessageID()); + msgIds.add(ref1.getMessageID()); + + List ms = pm.getMessages(msgIds); + assertNotNull(ms); + assertEquals(5, ms.size()); + assertTrue(containsMessage(ms, ref3.getMessageID())); + assertTrue(containsMessage(ms, ref4.getMessageID())); + assertTrue(containsMessage(ms, ref7.getMessageID())); + assertTrue(containsMessage(ms, ref9.getMessageID())); + assertTrue(containsMessage(ms, ref1.getMessageID())); + + pm.removeAllChannelData(channel.getChannelID()); } - public void testGetMessageReferences() throws Exception + public void testRemoveNonPersistentMessages() throws Exception { doSetup(false, false); Channel channel = new SimpleChannel(0, ms); - Message[] messages = createMessages(); + Message m1 = createMessage((byte)0, false); + Message m2 = createMessage((byte)0, true); + Message m3 = createMessage((byte)0, true); + Message m4 = createMessage((byte)0, false); + Message m5 = createMessage((byte)0, false); + Message m6 = createMessage((byte)0, false); + Message m7 = createMessage((byte)0, true); + Message m8 = createMessage((byte)0, true); + Message m9 = createMessage((byte)0, false); + Message m10 = createMessage((byte)0, true); + Message m11 = createMessage((byte)0, false); + Message m12 = createMessage((byte)0, false); + Message m13 = createMessage((byte)0, false); + Message m14 = createMessage((byte)0, false); + Message m15 = createMessage((byte)0, true); + Message m16 = createMessage((byte)0, true); + Message m17 = createMessage((byte)0, false); + Message m18 = createMessage((byte)0, false); + Message m19 = createMessage((byte)0, false); + Message m20 = createMessage((byte)0, true); - for (int i = 0; i < messages.length; i++) - { - Message m = messages[i]; - MessageReference ref = ms.reference(m); + MessageReference ref1 = ms.reference(m1); + ref1.setOrdering(1); + MessageReference ref2 = ms.reference(m2); + ref2.setOrdering(2); + MessageReference ref3 = ms.reference(m3); + ref3.setOrdering(3); + MessageReference ref4 = ms.reference(m4); + ref4.setOrdering(4); + MessageReference ref5 = ms.reference(m5); + ref5.setOrdering(5); + MessageReference ref6 = ms.reference(m6); + ref6.setOrdering(6); + MessageReference ref7 = ms.reference(m7); + ref7.setOrdering(7); + MessageReference ref8 = ms.reference(m8); + ref8.setOrdering(8); + MessageReference ref9 = ms.reference(m9); + ref9.setOrdering(9); + MessageReference ref10 = ms.reference(m10); + ref10.setOrdering(10); + MessageReference ref11 = ms.reference(m11); + ref11.setOrdering(11); + MessageReference ref12 = ms.reference(m12); + ref12.setOrdering(12); + MessageReference ref13 = ms.reference(m13); + ref13.setOrdering(13); + MessageReference ref14 = ms.reference(m14); + ref14.setOrdering(14); + MessageReference ref15 = ms.reference(m15); + ref15.setOrdering(15); + MessageReference ref16 = ms.reference(m16); + ref16.setOrdering(16); + MessageReference ref17 = ms.reference(m17); + ref17.setOrdering(17); + MessageReference ref18 = ms.reference(m18); + ref18.setOrdering(18); + MessageReference ref19 = ms.reference(m19); + ref19.setOrdering(19); + MessageReference ref20 = ms.reference(m20); + ref20.setOrdering(20); + + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref3, null); + pm.addReference(channel.getChannelID(), ref4, null); + pm.addReference(channel.getChannelID(), ref5, null); + pm.addReference(channel.getChannelID(), ref6, null); + pm.addReference(channel.getChannelID(), ref7, null); + pm.addReference(channel.getChannelID(), ref8, null); + pm.addReference(channel.getChannelID(), ref9, null); + pm.addReference(channel.getChannelID(), ref10, null); + pm.addReference(channel.getChannelID(), ref11, null); + pm.addReference(channel.getChannelID(), ref12, null); + pm.addReference(channel.getChannelID(), ref13, null); + pm.addReference(channel.getChannelID(), ref14, null); + pm.addReference(channel.getChannelID(), ref15, null); + pm.addReference(channel.getChannelID(), ref16, null); + pm.addReference(channel.getChannelID(), ref17, null); + pm.addReference(channel.getChannelID(), ref18, null); + pm.addReference(channel.getChannelID(), ref19, null); + pm.addReference(channel.getChannelID(), ref20, null); + + List refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(20, refIds.size()); + assertTrue(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertTrue(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + assertTrue(refIds.contains(ref11.getMessageID())); + assertTrue(refIds.contains(ref12.getMessageID())); + assertTrue(refIds.contains(ref13.getMessageID())); + assertTrue(refIds.contains(ref14.getMessageID())); + assertTrue(refIds.contains(ref15.getMessageID())); + assertTrue(refIds.contains(ref16.getMessageID())); + assertTrue(refIds.contains(ref17.getMessageID())); + assertTrue(refIds.contains(ref18.getMessageID())); + assertTrue(refIds.contains(ref19.getMessageID())); + assertTrue(refIds.contains(ref20.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(20, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + assertTrue(msgs.contains(ref6.getMessageID())); + assertTrue(msgs.contains(ref7.getMessageID())); + assertTrue(msgs.contains(ref8.getMessageID())); + assertTrue(msgs.contains(ref9.getMessageID())); + assertTrue(msgs.contains(ref10.getMessageID())); + assertTrue(msgs.contains(ref11.getMessageID())); + assertTrue(msgs.contains(ref12.getMessageID())); + assertTrue(msgs.contains(ref13.getMessageID())); + assertTrue(msgs.contains(ref14.getMessageID())); + assertTrue(msgs.contains(ref15.getMessageID())); + assertTrue(msgs.contains(ref16.getMessageID())); + assertTrue(msgs.contains(ref17.getMessageID())); + assertTrue(msgs.contains(ref18.getMessageID())); + assertTrue(msgs.contains(ref19.getMessageID())); + assertTrue(msgs.contains(ref20.getMessageID())); + + pm.removeNonPersistentMessageReferences(channel.getChannelID(), 1, 4); + + refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(18, refIds.size()); + + assertFalse(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertFalse(refIds.contains(ref4.getMessageID())); + assertTrue(refIds.contains(ref5.getMessageID())); + assertTrue(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertTrue(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + assertTrue(refIds.contains(ref11.getMessageID())); + assertTrue(refIds.contains(ref12.getMessageID())); + assertTrue(refIds.contains(ref13.getMessageID())); + assertTrue(refIds.contains(ref14.getMessageID())); + assertTrue(refIds.contains(ref15.getMessageID())); + assertTrue(refIds.contains(ref16.getMessageID())); + assertTrue(refIds.contains(ref17.getMessageID())); + assertTrue(refIds.contains(ref18.getMessageID())); + assertTrue(refIds.contains(ref19.getMessageID())); + assertTrue(refIds.contains(ref20.getMessageID())); + + pm.removeNonPersistentMessageReferences(channel.getChannelID(), 4, 14); + + refIds = getReferenceIds(channel.getChannelID()); + assertNotNull(refIds); + assertEquals(11, refIds.size()); + + assertFalse(refIds.contains(ref1.getMessageID())); + assertTrue(refIds.contains(ref2.getMessageID())); + assertTrue(refIds.contains(ref3.getMessageID())); + assertFalse(refIds.contains(ref4.getMessageID())); + assertFalse(refIds.contains(ref5.getMessageID())); + assertFalse(refIds.contains(ref6.getMessageID())); + assertTrue(refIds.contains(ref7.getMessageID())); + assertTrue(refIds.contains(ref8.getMessageID())); + assertFalse(refIds.contains(ref9.getMessageID())); + assertTrue(refIds.contains(ref10.getMessageID())); + assertFalse(refIds.contains(ref11.getMessageID())); + assertFalse(refIds.contains(ref12.getMessageID())); + assertFalse(refIds.contains(ref13.getMessageID())); + assertFalse(refIds.contains(ref14.getMessageID())); + assertTrue(refIds.contains(ref15.getMessageID())); + assertTrue(refIds.contains(ref16.getMessageID())); + assertTrue(refIds.contains(ref17.getMessageID())); + assertTrue(refIds.contains(ref18.getMessageID())); + assertTrue(refIds.contains(ref19.getMessageID())); + assertTrue(refIds.contains(ref20.getMessageID())); - tl.addReference(channel.getChannelID(), ref, null); + pm.removeAllChannelData(channel.getChannelID()); } - List refs = tl.messageRefs(channel.getChannelID()); - assertNotNull(refs); - assertEquals(messages.length, refs.size()); + public void testAddRemoveReference() throws Exception + { + doSetup(false, false); - for (int i = 0; i < messages.length; i++) + Channel channel1 = new SimpleChannel(0, ms); + Channel channel2 = new SimpleChannel(1, ms); + + Message[] messages = createMessages(10); + + for (int i = 0; i < 5; i++) { - Message m = messages[i]; - assertTrue(refs.contains(m.getMessageID())); - } + Message m1 = messages[i * 2]; + Message m2 = messages[i * 2 + 1]; + + MessageReference ref1_1 = ms.reference(m1); + MessageReference ref1_2 = ms.reference(m1); + + MessageReference ref2_1 = ms.reference(m2); + MessageReference ref2_2 = ms.reference(m2); + + ref1_1.incrementChannelCount(); + ref1_2.incrementChannelCount(); + ref2_1.incrementChannelCount(); + ref2_2.incrementChannelCount(); + pm.addReference(channel1.getChannelID(), ref1_1, null); + pm.addReference(channel1.getChannelID(), ref2_1, null); + + pm.addReference(channel2.getChannelID(), ref1_2, null); + pm.addReference(channel2.getChannelID(), ref2_2, null); + + List refs = getReferenceIds(channel1.getChannelID()); - tl.removeAllMessageData(channel.getChannelID()); - refs = tl.messageRefs(channel.getChannelID()); + assertNotNull(refs); + assertEquals(2, refs.size()); + assertTrue(refs.contains(m1.getMessageID())); + assertTrue(refs.contains(m2.getMessageID())); + + refs = getReferenceIds(channel2.getChannelID()); + + assertNotNull(refs); + assertEquals(2, refs.size()); + assertTrue(refs.contains(m1.getMessageID())); + assertTrue(refs.contains(m2.getMessageID())); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(m1.getMessageID())); + assertTrue(msgs.contains(m2.getMessageID())); + + ref1_1.decrementChannelCount(); + pm.removeReference(channel1.getChannelID(), ref1_1, null); + + refs = getReferenceIds(channel1.getChannelID()); + assertNotNull(refs); + assertEquals(1, refs.size()); + assertTrue(refs.contains(m2.getMessageID())); + + refs = getReferenceIds(channel2.getChannelID()); + assertNotNull(refs); + assertEquals(2, refs.size()); + assertTrue(refs.contains(m1.getMessageID())); + assertTrue(refs.contains(m2.getMessageID())); + + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(m1.getMessageID())); + assertTrue(msgs.contains(m2.getMessageID())); + + ref1_2.decrementChannelCount(); + pm.removeReference(channel2.getChannelID(), ref1_2, null); + + refs = getReferenceIds(channel1.getChannelID()); + assertNotNull(refs); + assertEquals(1, refs.size()); + assertTrue(refs.contains(m2.getMessageID())); + + refs = getReferenceIds(channel2.getChannelID()); + assertNotNull(refs); + assertEquals(1, refs.size()); + assertTrue(refs.contains(m2.getMessageID())); + + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(1, msgs.size()); + assertTrue(msgs.contains(m2.getMessageID())); + + ref2_1.decrementChannelCount(); + pm.removeReference(channel1.getChannelID(), ref2_1, null); + + refs = getReferenceIds(channel1.getChannelID()); assertNotNull(refs); assertTrue(refs.isEmpty()); - } + refs = getReferenceIds(channel2.getChannelID()); + assertNotNull(refs); + assertEquals(1, refs.size()); + assertTrue(refs.contains(m2.getMessageID())); + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(1, msgs.size()); + assertTrue(msgs.contains(m2.getMessageID())); - public void testRemoveAllMessageData() throws Exception + ref2_2.decrementChannelCount(); + pm.removeReference(channel2.getChannelID(), ref2_2, null); + + refs = getReferenceIds(channel1.getChannelID()); + assertNotNull(refs); + assertTrue(refs.isEmpty()); + + refs = getReferenceIds(channel2.getChannelID()); + assertNotNull(refs); + assertTrue(refs.isEmpty()); + + msgs = getMessageIds(); + assertNotNull(msgs); + assertTrue(msgs.isEmpty()); + + } + } + + public void testRemoveAllChannelData() throws Exception { doSetup(false, false); Channel channel = new SimpleChannel(0, ms); - Message[] messages = createMessages(); + Message[] messages = createMessages(10); for (int i = 0; i < messages.length; i++) { @@ -232,23 +970,27 @@ MessageReference ref = ms.reference(m); - tl.addReference(channel.getChannelID(), ref, null); + pm.addReference(channel.getChannelID(), ref, null); } - List refs = tl.messageRefs(channel.getChannelID()); + List refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(messages.length, refs.size()); + + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(messages.length, msgs.size()); + for (int i = 0; i < messages.length; i++) { Message m = messages[i]; assertTrue(refs.contains(m.getMessageID())); } - tl.removeAllMessageData(channel.getChannelID()); + pm.removeAllChannelData(channel.getChannelID()); - refs = tl.messageRefs(channel.getChannelID()); - assertNotNull(refs); + refs = getReferenceIds(channel.getChannelID()); assertTrue(refs.isEmpty()); } @@ -360,9 +1102,9 @@ Channel channel = new SimpleChannel(0, ms); TransactionRepository txRep = new TransactionRepository(); - txRep.start(tl); + txRep.start(pm); - Message[] messages = createMessages(); + Message[] messages = createMessages(10); Xid[] xids = new Xid[messages.length]; Transaction[] txs = new Transaction[messages.length]; @@ -372,11 +1114,11 @@ xids[i] = new MockXid(); txs[i] = txRep.createTransaction(xids[i]); MessageReference ref = ms.reference(messages[i]); - tl.addReference(channel.getChannelID(), ref, txs[i]); + pm.addReference(channel.getChannelID(), ref, txs[i]); txs[i].prepare(); } - List txList = tl.retrievePreparedTransactions(); + List txList = pm.retrievePreparedTransactions(); assertNotNull(txList); assertEquals(messages.length, txList.size()); @@ -393,32 +1135,32 @@ } - tl.removeAllMessageData(channel.getChannelID()); + pm.removeAllChannelData(channel.getChannelID()); } - protected Message createMessage(byte i) throws Exception + protected Message createMessage(byte i, boolean reliable) throws Exception { Map headers = generateFilledMap(true); Message m = MessageFactory.createCoreMessage(new GUID().toString(), - true, + reliable, System.currentTimeMillis() + 1000 * 60 * 60, System.currentTimeMillis(), - i, + (byte)(i % 10), headers, i % 2 == 0 ? new WibblishObject() : null); return m; } - protected Message[] createMessages() throws Exception + protected Message[] createMessages(int num) throws Exception { //Generate some messages with a good range of attribute values - Message[] messages = new Message[10]; - for (int i = 0; i < 10; i++) + Message[] messages = new Message[num]; + for (int i = 0; i < num; i++) { - messages[i] = createMessage((byte)i); + messages[i] = createMessage((byte)i, true); } return messages; } @@ -719,11 +1461,11 @@ Channel channel = new SimpleChannel(0, ms); TransactionRepository txRep = new TransactionRepository(); - txRep.start(tl); + txRep.start(pm); log.debug("transaction log started"); - Message[] messages = createMessages(); + Message[] messages = createMessages(10); Message m1 = messages[0]; Message m2 = messages[1]; @@ -750,46 +1492,65 @@ log.debug("adding references non-transactionally"); // Add first two refs non transactionally - tl.addReference(channel.getChannelID(), ref1, null); - tl.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); //check they're there - List refs = tl.messageRefs(channel.getChannelID()); + List refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(2, refs.size()); assertTrue(refs.contains(ref1.getMessageID())); assertTrue(refs.contains(ref2.getMessageID())); + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + log.debug("ref1 and ref2 are there"); //Add the next 3 refs transactionally - tl.addReference(channel.getChannelID(), ref3, tx); - tl.addReference(channel.getChannelID(), ref4, tx); - tl.addReference(channel.getChannelID(), ref5, tx); + pm.addReference(channel.getChannelID(), ref3, tx); + pm.addReference(channel.getChannelID(), ref4, tx); + pm.addReference(channel.getChannelID(), ref5, tx); //Remove the other 2 transactionally - tl.removeReference(channel.getChannelID(), ref1, tx); - tl.removeReference(channel.getChannelID(), ref2, tx); + pm.removeReference(channel.getChannelID(), ref1, tx); + pm.removeReference(channel.getChannelID(), ref2, tx); //Check the changes aren't visible - refs = tl.messageRefs(channel.getChannelID()); + refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(2, refs.size()); assertTrue(refs.contains(ref1.getMessageID())); assertTrue(refs.contains(ref2.getMessageID())); + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + //commit transaction tx.commit(); //check we can see only the last 3 refs - refs = tl.messageRefs(channel.getChannelID()); + refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(3, refs.size()); assertTrue(refs.contains(ref3.getMessageID())); assertTrue(refs.contains(ref4.getMessageID())); assertTrue(refs.contains(ref5.getMessageID())); - tl.removeAllMessageData(channel.getChannelID()); + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(3, msgs.size()); + assertTrue(msgs.contains(ref3.getMessageID())); + assertTrue(msgs.contains(ref4.getMessageID())); + assertTrue(msgs.contains(ref5.getMessageID())); + + pm.removeAllChannelData(channel.getChannelID()); } @@ -799,9 +1560,9 @@ Channel channel = new SimpleChannel(0, ms); TransactionRepository txRep = new TransactionRepository(); - txRep.start(tl); + txRep.start(pm); - Message[] messages = createMessages(); + Message[] messages = createMessages(10); Message m1 = messages[0]; Message m2 = messages[1]; @@ -827,43 +1588,62 @@ MessageReference ref5 = ms.reference(m5); //Add first two refs non transactionally - tl.addReference(channel.getChannelID(), ref1, null); - tl.addReference(channel.getChannelID(), ref2, null); + pm.addReference(channel.getChannelID(), ref1, null); + pm.addReference(channel.getChannelID(), ref2, null); //check they're there - List refs = tl.messageRefs(channel.getChannelID()); + List refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(2, refs.size()); assertTrue(refs.contains(ref1.getMessageID())); assertTrue(refs.contains(ref2.getMessageID())); + List msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + + + //Add the next 3 refs transactionally - tl.addReference(channel.getChannelID(), ref3, tx); - tl.addReference(channel.getChannelID(), ref4, tx); - tl.addReference(channel.getChannelID(), ref5, tx); + pm.addReference(channel.getChannelID(), ref3, tx); + pm.addReference(channel.getChannelID(), ref4, tx); + pm.addReference(channel.getChannelID(), ref5, tx); //Remove the other 2 transactionally - tl.removeReference(channel.getChannelID(), ref1, tx); - tl.removeReference(channel.getChannelID(), ref2, tx); + pm.removeReference(channel.getChannelID(), ref1, tx); + pm.removeReference(channel.getChannelID(), ref2, tx); //Check the changes aren't visible - refs = tl.messageRefs(channel.getChannelID()); + refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(2, refs.size()); assertTrue(refs.contains(ref1.getMessageID())); assertTrue(refs.contains(ref2.getMessageID())); + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + //rollback transaction tx.rollback(); - refs = tl.messageRefs(channel.getChannelID()); + refs = getReferenceIds(channel.getChannelID()); assertNotNull(refs); assertEquals(2, refs.size()); assertTrue(refs.contains(ref1.getMessageID())); assertTrue(refs.contains(ref2.getMessageID())); - tl.removeAllMessageData(channel.getChannelID()); + msgs = getMessageIds(); + assertNotNull(msgs); + assertEquals(2, msgs.size()); + assertTrue(msgs.contains(ref1.getMessageID())); + assertTrue(msgs.contains(ref2.getMessageID())); + pm.removeAllChannelData(channel.getChannelID()); } @@ -882,18 +1662,94 @@ props.put("CREATE_MESSAGE_REF", "CREATE TABLE MESSAGE_REFERENCE (" + - "CHANNELID VARCHAR(256), " + + "CHANNELID BIGINT, " + "MESSAGEID VARCHAR(256), " + "TRANSACTIONID VARCHAR(255), " + "STATE CHAR(1), " + "ORD BIGINT, " + "DELIVERYCOUNT INTEGER, " + + "PERSISTENT CHAR(1), " + "PRIMARY KEY(CHANNELID, MESSAGEID))"); return props; } + protected List getReferenceIds(long channelId) throws Exception + { + InitialContext ctx = new InitialContext(); + + TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME); + DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS"); + + javax.transaction.Transaction txOld = mgr.suspend(); + mgr.begin(); + + Connection conn = ds.getConnection(); + String sql = "SELECT MESSAGEID FROM MESSAGE_REFERENCE WHERE CHANNELID=? ORDER BY ORD"; + PreparedStatement ps = conn.prepareStatement(sql); + ps.setLong(1, channelId); + + ResultSet rs = ps.executeQuery(); + + List msgIds = new ArrayList(); + + while (rs.next()) + { + String msgId = rs.getString(1); + msgIds.add(msgId); + } + rs.close(); + ps.close(); + conn.close(); + + mgr.commit(); + + if (txOld != null) + { + mgr.resume(txOld); + } + + return msgIds; + } + + protected List getMessageIds() throws Exception + { + InitialContext ctx = new InitialContext(); + + TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME); + DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS"); + + javax.transaction.Transaction txOld = mgr.suspend(); + mgr.begin(); + + Connection conn = ds.getConnection(); + String sql = "SELECT MESSAGEID FROM MESSAGE ORDER BY MESSAGEID"; + PreparedStatement ps = conn.prepareStatement(sql); + + ResultSet rs = ps.executeQuery(); + + List msgIds = new ArrayList(); + + while (rs.next()) + { + String msgId = rs.getString(1); + msgIds.add(msgId); + } + rs.close(); + ps.close(); + conn.close(); + + mgr.commit(); + + if (txOld != null) + { + mgr.resume(txOld); + } + + return msgIds; + } + } 1.2 +4 -4 jboss-jms/tests/src/org/jboss/test/messaging/core/plugin/MessageStoreWithPersistenceManagerTest.java (In the diff below, changes in quantity of whitespace are not shown.) Index: MessageStoreWithPersistenceManagerTest.java =================================================================== RCS file: /cvsroot/jboss/jboss-jms/tests/src/org/jboss/test/messaging/core/plugin/MessageStoreWithPersistenceManagerTest.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -b -r1.1 -r1.2 --- MessageStoreWithPersistenceManagerTest.java 4 Mar 2006 02:43:57 -0000 1.1 +++ MessageStoreWithPersistenceManagerTest.java 5 Mar 2006 16:17:26 -0000 1.2 @@ -30,9 +30,9 @@ /** * @author <a href="mailto:ov...@jb...">Ovidiu Feodorov</a> - * @version <tt>$Revision: 1.1 $</tt> + * @version <tt>$Revision: 1.2 $</tt> * - * $Id: MessageStoreWithPersistenceManagerTest.java,v 1.1 2006/03/04 02:43:57 ovidiu Exp $ + * $Id: MessageStoreWithPersistenceManagerTest.java,v 1.2 2006/03/05 16:17:26 timfox Exp $ */ public class MessageStoreWithPersistenceManagerTest extends MessageStoreTestBase { @@ -64,9 +64,9 @@ pm = new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager()); pm.start(); - ms = new PagingMessageStore("s9", pm); + ms = new PagingMessageStore("s9"); - ms2 = new PagingMessageStore("s10", pm); + ms2 = new PagingMessageStore("s10"); log.debug("setup done"); } |