From: <tho...@us...> - 2013-12-10 21:37:05
|
Revision: 7630 http://bigdata.svn.sourceforge.net/bigdata/?rev=7630&view=rev Author: thompsonbry Date: 2013-12-10 21:36:50 +0000 (Tue, 10 Dec 2013) Log Message: ----------- Sync to Martyn on #779. I have added a listener protocol for the incremental write replication and sufficient hooks that we can force the failure of the write replication protocol by intercepting it at some desired point between the receive and the replicate of bytes in a payload. I added a sure kill for A+B+C where B is killed during a large load when it has receive two chunks of data for the large load. This test passes. I did a similar test where C is killed. This test fails - it gets to the end of the test and then discovers that the LOAD failed. Modified Paths: -------------- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt branches/MGC_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/MGC_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JustKills.java Removed Paths: ------------- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestJournalHA.java Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumPipelineImpl.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -847,11 +847,20 @@ // Setup the receive service. receiveService = new HAReceiveService<HAMessageWrapper>(addrSelf, addrNext, new IHAReceiveCallback<HAMessageWrapper>() { + @Override public void callback(final HAMessageWrapper msg, final ByteBuffer data) throws Exception { // delegate handling of write cache blocks. handleReplicatedWrite(msg.req, msg.msg, data); } + @Override + public void incReceive(final HAMessageWrapper msg, + final int nreads, final int rdlen, + final int rem) throws Exception { + // delegate handling of incremental receive notify. + QuorumPipelineImpl.this.incReceive(msg.req, + msg.msg, nreads, rdlen, rem); + } }); // Start the receive service - will not return until service is // running @@ -2056,6 +2065,27 @@ final IHAWriteMessage msg, final ByteBuffer data) throws Exception; /** + * Notify that some payload bytes have been incrementally received for an + * {@link IHAMessage}. + * + * @param msg + * The message. + * @param nreads + * The number of reads performed against the upstream socket for + * this message. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + abstract protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, final int rdlen, + final int rem) throws Exception; + + /** * A utility class that bundles together the Internet address and port at which * the downstream service will accept and relay cache blocks for the write * pipeline and the remote interface which is used to communicate with that Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/QuorumServiceBase.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -109,6 +109,15 @@ } @Override + protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, + final int rdlen, final int rem) throws Exception { + + QuorumServiceBase.this.incReceive(req, msg, nreads, rdlen, rem); + + } + + @Override protected long getRetrySendTimeoutNanos() { return QuorumServiceBase.this.getRetrySendTimeoutNanos(); @@ -262,8 +271,31 @@ */ abstract protected void handleReplicatedWrite(IHASyncRequest req, IHAWriteMessage msg, ByteBuffer data) throws Exception; - + /** + * Core implementation of callback for monitoring progress of replicated + * writes. + * + * @param req + * The synchronization request (optional). When non- + * <code>null</code> the message and payload are historical data. + * When <code>null</code> they are live data. + * @param msg + * Metadata about a buffer containing data replicated to this + * node. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + abstract protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, final int rdlen, + final int rem) throws Exception; + + /** * {@inheritDoc} * <p> * Note: The default implementation is a NOP. Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HAReceiveService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -52,6 +52,7 @@ import org.apache.log4j.Logger; import com.bigdata.ha.QuorumPipelineImpl; +import com.bigdata.ha.msg.IHAMessage; import com.bigdata.ha.msg.IHASyncRequest; import com.bigdata.ha.msg.IHAWriteMessage; import com.bigdata.ha.msg.IHAWriteMessageBase; @@ -1079,6 +1080,11 @@ rem -= rdlen; + if (callback != null) { + // notify of incremental read. + callback.incReceive(message, reads, rdlen, rem); + } + /* * Now forward the most recent transfer bytes downstream * @@ -1349,6 +1355,25 @@ */ void callback(M msg, ByteBuffer data) throws Exception; + /** + * Notify that some payload bytes have been incrementally received for + * an {@link IHAMessage}. This is invoked each time some data has been + * read from the upstream socket. + * + * @param msg + * The message. + * @param nreads + * The number of reads performed against the upstream socket + * for this message. + * @param rdlen + * The number of bytes read from the socket in this read. + * @param rem + * The number of bytes remaining before the payload has been + * fully read. + * + * @throws Exception + */ + void incReceive(M msg, int nreads, int rdlen, int rem) throws Exception; } /** Modified: branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/java/com/bigdata/ha/pipeline/HASendService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -131,10 +131,30 @@ * @see #start(InetSocketAddress) */ public HASendService() { + + this(true/* blocking */); + + } + + /** + * Note: This constructor is not exposed yet. We need to figure out whether + * to allow the configuration of the socket options and how to support that. + * + * @param blocking + */ + private HASendService(final boolean blocking) { + + this.blocking = blocking; - } + } /** + * <code>true</code> iff the client socket will be setup in a blocking mode. + * This is the historical behavior until at least Dec 10, 2013. + */ + private final boolean blocking; + + /** * Extended to ensure that the private executor service is always * terminated. */ @@ -422,22 +442,29 @@ * * @throws IOException */ - static private SocketChannel openChannel(final InetSocketAddress addr) + private SocketChannel openChannel(final InetSocketAddress addr) throws IOException { final SocketChannel socketChannel = SocketChannel.open(); try { - socketChannel.configureBlocking(true); + socketChannel.configureBlocking(blocking); if (log.isTraceEnabled()) log.trace("Connecting to " + addr); - socketChannel.connect(addr); + if (!socketChannel.connect(addr)) { + while (!socketChannel.finishConnect()) { + try { + Thread.sleep(10/* millis */); + } catch (InterruptedException e) { + // Propagate interrupt. + Thread.currentThread().interrupt(); + } + } + } - socketChannel.finishConnect(); - } catch (IOException ex) { log.error(ex); @@ -520,7 +547,7 @@ * socket buffer exchange and the send() Future will report * success even through the application code on the receiver * could fail once it gets control back from select(). This - * twist can be a bit suprising. Therefore it is useful to + * twist can be a bit surprising. Therefore it is useful to * write tests with both small payloads (the data transfer * will succeed at the socket level even if the application * logic then fails the transfer) and for large payloads. Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/TestAll.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -108,7 +108,6 @@ suite.addTest(com.bigdata.io.writecache.TestAll.suite()); suite.addTest( com.bigdata.journal.TestAll.suite() ); suite.addTest( com.bigdata.rwstore.TestAll.suite() ); - suite.addTest( com.bigdata.journal.ha.TestAll.suite() ); suite.addTest( com.bigdata.resources.TestAll.suite() ); suite.addTest( com.bigdata.relation.TestAll.suite() ); suite.addTest( com.bigdata.bop.TestAll.suite() ); Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/io/writecache/TestWORMWriteCacheService.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -339,6 +339,13 @@ } @Override + protected void incReceive(final IHASyncRequest req, + final IHAWriteMessage msg, final int nreads, + final int rdlen, final int rem) throws Exception { + // NOP + } + + @Override public UUID getStoreUUID() { return MyMockQuorumMember.this.getStoreUUID(); } @@ -380,7 +387,7 @@ MyMockQuorumMember.this.purgeHALogs(token); } - + }); } Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/TestWORMStrategyNoCache.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -35,7 +35,6 @@ import junit.framework.Test; import com.bigdata.io.DirectBufferPool; -import com.bigdata.journal.ha.TestHAWORMStrategy; import com.bigdata.rawstore.IRawStore; /** @@ -44,7 +43,7 @@ * to operation when caching is disabled. * <p> * Note: The HA journal requires that cache be enabled. However, the HA journal - * is tested by a different test suite. See {@link TestHAWORMStrategy}. + * is tested by a different test suite. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/AbstractHAJournalTestCase.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,626 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.rmi.Remote; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import junit.framework.TestCase; - -import com.bigdata.LRUNexus; -import com.bigdata.ha.HAGlue; -import com.bigdata.ha.QuorumService; -import com.bigdata.ha.QuorumServiceBase; -import com.bigdata.ha.msg.IHASyncRequest; -import com.bigdata.ha.msg.IHAWriteMessage; -import com.bigdata.journal.AbstractJournal; -import com.bigdata.journal.AbstractJournalTestCase; -import com.bigdata.journal.IRootBlockView; -import com.bigdata.journal.Journal; -import com.bigdata.journal.Options; -import com.bigdata.journal.ProxyTestCase; -import com.bigdata.quorum.MockQuorumFixture; -import com.bigdata.quorum.Quorum; -import com.bigdata.quorum.QuorumActor; - -/** - * <p> - * Abstract harness for testing under a variety of configurations. In order to - * test a specific configuration, create a concrete instance of this class. The - * configuration can be described using a mixture of a <code>.properties</code> - * file of the same name as the test class and custom code. - * </p> - * <p> - * When debugging from an IDE, it is very helpful to be able to run a single - * test case. You can do this, but you MUST define the property - * <code>testClass</code> as the name test class that has the logic required - * to instantiate and configure an appropriate object manager instance for the - * test. - * </p> - */ -abstract public class AbstractHAJournalTestCase - extends AbstractJournalTestCase -{ - - // - // Constructors. - // - - public AbstractHAJournalTestCase() {} - - public AbstractHAJournalTestCase(String name) {super(name);} - - //************************************************************ - //************************************************************ - //************************************************************ - - /** - * The replication factor for the quorum. This is initialized in - * {@link #setUp(ProxyTestCase)} so you can override it. The default is - * <code>3</code>, which is a highly available quorum. - */ - protected int k; - - /** - * The replication count (#of journals) for the quorum. This is initialized - * in {@link #setUp(ProxyTestCase)} so you can override it. The default is - * <code>3</code>, which is the same as the default replication factor - * {@link #k}. - */ - protected int replicationCount; - /** - * The fixture provides a mock of the distributed quorum state machine. - */ - protected MockQuorumFixture fixture = null; - /** - * The logical service identifier. - */ - protected String logicalServiceId = null; - /** - * The {@link Journal}s which are the members of the logical service. - */ - private Journal[] stores = null; - - /** - * Invoked from {@link TestCase#setUp()} for each test in the suite. - */ - public void setUp(ProxyTestCase testCase) throws Exception { - - super.setUp(testCase); - -// if(log.isInfoEnabled()) -// log.info("\n\n================:BEGIN:" + testCase.getName() -// + ":BEGIN:===================="); - - fixture = new MockQuorumFixture(); -// fixture.start(); - logicalServiceId = "logicalService_" + getName(); - - k = 3; - - replicationCount = 3; - - } - - /** - * Invoked from {@link TestCase#tearDown()} for each test in the suite. - */ - public void tearDown(ProxyTestCase testCase) throws Exception { - - if (stores != null) { - for (Journal store : stores) { - if (store != null) { - store.destroy(); - } - } - } - - if(fixture!=null) { - fixture.terminate(); - fixture = null; - } - - logicalServiceId = null; - - super.tearDown(testCase); - - } - - /** - * Note: Due to the manner in which the {@link MockQuorumManager} is - * initialized, the elements in {@link #stores} will be <code>null</code> - * initially. This should be Ok as long as the test gets fully setup before - * you start making requests of the {@link QuorumManager} or the - * {@link Quorum}. - */ - @Override - protected Journal getStore(final Properties properties) { - - stores = new Journal[replicationCount]; - - for (int i = 0; i < replicationCount; i++) { - - stores[i] = newJournal(properties); - - } - - /* - * FIXME It appears that it is necessary to start the QuorumFixture and - * then each Quorum *BEFORE* any quorum member takes an action, e.g., - * by doing a memberAdd(). That is likely a flaw in the QuorumFixture - * or the Quorum code. - */ - fixture.start(); - - for (int i = 0; i < replicationCount; i++) { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = stores[i] - .getQuorum(); - final HAJournal jnl = (HAJournal) stores[i]; - final UUID serviceId = jnl.getUUID(); - quorum.start(newQuorumService(logicalServiceId, serviceId, - jnl.newHAGlue(serviceId), jnl)); - } - - for (int i = 0; i < replicationCount; i++) { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = stores[i] - .getQuorum(); - final HAJournal jnl = (HAJournal) stores[i]; - /* - * Tell the actor to try and join the quorum. It will join iff our - * current root block can form a simple majority with the other - * services in the quorum. - */ - final QuorumActor<?, ?> actor = quorum.getActor(); - try { - - actor.memberAdd(); - fixture.awaitDeque(); - - actor.pipelineAdd(); - fixture.awaitDeque(); - - actor.castVote(jnl.getLastCommitTime()); - fixture.awaitDeque(); - - } catch (InterruptedException ex) { - - throw new RuntimeException(ex); - - } - } - - /* - * Initialize the master first. The followers will get their root blocks - * from the master. - */ -// fixture.join(stores[0]); -// fixture.join(stores[1]); -// fixture.join(stores[2]); -// -// stores[1].takeRootBlocksFromLeader(); -// stores[2].takeRootBlocksFromLeader(); - - /* - * @todo we probably have to return the service which joined as the - * leader from getStore(). - */ - final Quorum<HAGlue, QuorumService<HAGlue>> q = stores[0].getQuorum(); - - try { - - final long token = q.awaitQuorum(1L,TimeUnit.SECONDS); - assertEquals(token, stores[1].getQuorum().awaitQuorum(1L,TimeUnit.SECONDS)); - assertEquals(token, stores[2].getQuorum().awaitQuorum(1L,TimeUnit.SECONDS)); - - q.getClient().assertLeader(token); - - assertEquals(k, q.getMembers().length); - - } catch (TimeoutException ex) { -for(int i=0; i<3; i++)log.error("quorum["+i+"]:"+(stores[i].getQuorum()).toString()); - throw new RuntimeException(ex); - - } catch (InterruptedException ex) { - - throw new RuntimeException(ex); - - } - - // return the master. - return stores[0]; - - } - - protected Journal newJournal(final Properties properties) { - - /* - * Initialize the HA components. - */ - - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = newQuorum(); - - final HAJournal jnl = new HAJournal(properties, quorum); - -// /* -// * FIXME This probably should be a constant across the life cycle of the -// * service, in which case it needs to be elevated outside of this method -// * which is used both to open and re-open the journal. -// */ -// final UUID serviceId = UUID.randomUUID(); - - /* - * Set the client on the quorum. - * - * FIXME The client needs to manage the quorumToken and various other - * things. - */ -// quorum.start(newQuorumService(logicalServiceId, serviceId, jnl -// .newHAGlue(serviceId), jnl)); - -// // discard the current write set. -// abort(); -// -// // set the quorum object. -// this.quorum.set(quorum); -// -// // save off the current token (typically NO_QUORUM unless standalone). -// quorumToken = quorum.token(); - -// /* -// * Tell the actor to try and join the quorum. It will join iff our -// * current root block can form a simple majority with the other services -// * in the quorum. -// */ -// final QuorumActor<?, ?> actor = quorum.getActor(); -// try { -// -// actor.memberAdd(); -// fixture.awaitDeque(); -// -// actor.pipelineAdd(); -// fixture.awaitDeque(); -// -// actor.castVote(jnl.getLastCommitTime()); -// fixture.awaitDeque(); -// -// } catch (InterruptedException ex) { -// -// throw new RuntimeException(ex); -// -// } - - return jnl; - -// return new Journal(properties) { -// -// protected Quorum<HAGlue, QuorumService<HAGlue>> newQuorum() { -// -// return AbstractHAJournalTestCase.this.newQuorum(); -// -// }; -// -// }; - - } - - private static class HAJournal extends Journal { - - /** - * @param properties - */ - public HAJournal(Properties properties, - Quorum<HAGlue, QuorumService<HAGlue>> quorum) { - super(properties, quorum); - } - - /** - * {@inheritDoc} - * <p> - * Note: This uses a random port on the loopback address. - */ - @Override - public HAGlue newHAGlue(final UUID serviceId) { - - final InetSocketAddress writePipelineAddr; - try { - writePipelineAddr = new InetSocketAddress(getPort(0)); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new HAGlueService(serviceId, writePipelineAddr); - - } - - /** - * Return an unused port. - * - * @param suggestedPort - * The suggested port. - * - * @return The suggested port, unless it is zero or already in use, in which - * case an unused port is returned. - * - * @throws IOException - */ - static protected int getPort(int suggestedPort) throws IOException { - ServerSocket openSocket; - try { - openSocket = new ServerSocket(suggestedPort); - } catch (BindException ex) { - // the port is busy, so look for a random open port - openSocket = new ServerSocket(0); - } - final int port = openSocket.getLocalPort(); - openSocket.close(); - return port; - } - - /** - * Extended implementation supports RMI. - */ - protected class HAGlueService extends BasicHA { - - protected HAGlueService(final UUID serviceId, - final InetSocketAddress writePipelineAddr) { - - super(serviceId, writePipelineAddr); - - } - - } - - } - - protected Quorum<HAGlue, QuorumService<HAGlue>> newQuorum() { - - return new MockQuorumFixture.MockQuorum<HAGlue, QuorumService<HAGlue>>( - k, fixture); - - }; - - /** - * Factory for the {@link QuorumService} implementation. - * - * @param remoteServiceImpl - * The object that implements the {@link Remote} interfaces - * supporting HA operations. - */ - protected QuorumServiceBase<HAGlue, AbstractJournal> newQuorumService( - final String logicalServiceId, - final UUID serviceId, final HAGlue remoteServiceImpl, - final AbstractJournal store) { - - return new QuorumServiceBase<HAGlue, AbstractJournal>(logicalServiceId, - serviceId, remoteServiceImpl, store) { - - /** - * Only the local service implementation object can be resolved. - */ - @Override - public HAGlue getService(final UUID serviceId) { - - return (HAGlue) fixture.getService(serviceId); - - } - - /** - * FIXME handle replicated writes. Probably just dump it on the - * jnl's WriteCacheService. Or maybe wrap it back up using a - * WriteCache and let that lay it down onto the disk. - */ - @Override - protected void handleReplicatedWrite(IHASyncRequest req, - IHAWriteMessage msg, ByteBuffer data) throws Exception { - - -// new WriteCache() { -// -// @Override -// protected boolean writeOnChannel(ByteBuffer buf, long firstOffset, -// Map<Long, RecordMetadata> recordMap, long nanos) -// throws InterruptedException, TimeoutException, IOException { -// // TODO Auto-generated method stub -// return false; -// } -// }; - - throw new UnsupportedOperationException(); - } - - @Override - public void installRootBlocks(IRootBlockView rootBlock0, - final IRootBlockView rootBlock1) { - throw new UnsupportedOperationException(); - } - -// @Override -// public void didMeet(final long token, final long commitCounter, -// final boolean isLeader) { -// throw new UnsupportedOperationException(); -// } - - @Override - public File getServiceDir() { - throw new UnsupportedOperationException(); - } - - @Override - public int getPID() { - throw new UnsupportedOperationException(); - } - - @Override - public void enterErrorState() { - // TODO Auto-generated method stub - } - - @Override - public void discardWriteSet() { - // TODO Auto-generated method stub - } - - @Override - protected long getRetrySendTimeoutNanos() { - return TimeUnit.MILLISECONDS.toNanos(100); // 100ms by default - } - - }; - - } - - /** - * Re-open the same backing store. - * - * @param store - * the existing store. - * - * @return A new store. - * - * @exception Throwable - * if the existing store is closed or if the store can not be - * re-opened, e.g., from failure to obtain a file lock, etc. - */ - @Override - protected Journal reopenStore(final Journal store) { - - if (stores[0] != store) - throw new AssertionError(); - - for (int i = 0; i < stores.length; i++) { - - Journal aStore = stores[i]; - - if (LRUNexus.INSTANCE != null) { - /* - * Drop the record cache for this store on reopen. This makes it - * easier to find errors related to a difference in the bytes on - * the disk versus the bytes in the record cache. - */ - LRUNexus.INSTANCE.deleteCache(aStore.getUUID()); - } - - // close the store. - aStore.close(); - - if (!aStore.isStable()) { - - throw new UnsupportedOperationException( - "The backing store is not stable"); - - } - - // Note: clone to avoid modifying!!! - final Properties properties = (Properties) getProperties().clone(); - - // Turn this off now since we want to re-open the same store. - properties.setProperty(Options.CREATE_TEMP_FILE, "false"); - - // The backing file that we need to re-open. - final File file = aStore.getFile(); - - if (file == null) - throw new AssertionError(); - - // Set the file property explicitly. - properties.setProperty(Options.FILE, file.toString()); - - stores[i] = newJournal(properties); - - } - - return stores[0]; - - } - -// /** -// * Begin to run as part of a highly available {@link Quorum}. -// * -// * @param newQuorum -// * The {@link Quorum}. -// */ -// public void joinQuorum(final Quorum<HAGlue, QuorumService<HAGlue>> quorum) { -// -// if (quorum == null) -// throw new IllegalArgumentException(); -// -// final WriteLock lock = _fieldReadWriteLock.writeLock(); -// -// lock.lock(); -// -// try { -// -// if (this.quorum.get() != null) { -// // Already running with some quorum. -// throw new IllegalStateException(); -// } -// -// // discard the current write set. -// abort(); -// -// // set the quorum object. -// this.quorum.set(quorum); -// -// // save off the current token (typically NO_QUORUM unless standalone). -// quorumToken = quorum.token(); -// -// /* -// * Tell the actor to try and join the quorum. It will join iff our -// * current root block can form a simple majority with the other -// * services in the quorum. -// */ -// final QuorumActor<?,?> actor = quorum.getActor(); -// actor.memberAdd(); -// actor.pipelineAdd(); -// actor.castVote(getRootBlockView().getLastCommitTime()); -// -// } catch (Throwable e) { -// -// throw new RuntimeException(e); -// -// } finally { -// -// lock.unlock(); -// -// } -// -// } - -} \ No newline at end of file Modified: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/HABranch.txt 2013-12-10 21:36:50 UTC (rev 7630) @@ -388,57 +388,12 @@ TODO: - - RWStore, write cache, write replication, standalone HA in - JOURNAL_HA_BRANCH. - - - The RW store needs to record delete blocks and needs a thread, - periodic task, or commit time task for releasing old commit - points. (Will be done with allocation block bitmap deltas, but - must retain session for resynchronization to be possible.) - - - WORM checksums. Historical WORM stores will not support this so - it needs to be captured by the WORM version number. - - - WORM integration with the write cache service. - - - WriteCacheService lock, sleep and possible gap and deadlock - issues. - - - Interrupted during sleep logs error. - - - WARN : 6938 main - com.bigdata.io.WriteCache.resetWith(WriteCache.java:1368): - Written WriteCache but with no records - - RW is always using 6 buffers. This must be a configuration option so we can stress test the WriteCacheService under heavy write loads and mixed write/read loads with lots of concurrency and only a few buffers. We need to do this to look for deadlocks. - - AbstractJournal: Modify to log the root block to be overwritten - during the commit protocol so we can potentially restore it - from the file. This is easier to do for the WORM and would - require a search of the appropriate allocation block's records - for the RW looking for anything which has the right magic value - and can also be interpreted as a RootBlockView (passes the - checksum, etc). - - - API to accept a pipeline update (watch on the children of a - znode for the logical journal) and to notify if you are no - longer the master (SessionExpiredException when you try some - zookeeper operation). Internally, the code has to handle the - join / leave. - - - API for replication and resynchronization writes. Slaves - should verify checksums as calculated by the master. - Differentiate between replication (ascending writes for the - WORM), resynchronization (delta to the end of the file for the - WORM), and write replacement (random write for both). - - - Journal must be aware of master/slave state and whether it is - caught up and can therefore support reads. - - Handle read errors by reading on a peer. Note that some file systems will retry MANY times, which could hang the caller (timed reads? how?). Consider doing replacement writes over @@ -448,96 +403,3 @@ HA. - Report read errors in support of decision making about failure. - -============================================================ - -During BSBM 2785 data load on laptop. 4/20/2010 - -java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:1826) - at com.bigdata.rdf.store.AbstractTripleStore.addStatements(AbstractTripleStore.java:3261) - at com.bigdata.rdf.rio.StatementBuffer.writeSPOs(StatementBuffer.java:989) - at com.bigdata.rdf.rio.StatementBuffer.addStatements(StatementBuffer.java:869) - at com.bigdata.rdf.rio.StatementBuffer.incrementalWrite(StatementBuffer.java:708) - at com.bigdata.rdf.rio.StatementBuffer.add(StatementBuffer.java:784) - at com.bigdata.rdf.rio.StatementBuffer.add(StatementBuffer.java:766) - at com.bigdata.rdf.sail.BigdataSail$BigdataSailConnection.addStatement(BigdataSail.java:1918) - at com.bigdata.rdf.sail.BigdataSail$BigdataSailConnection.addStatement(BigdataSail.java:1879) - at org.openrdf.repository.sail.SailRepositoryConnection.addWithoutCommit(SailRepositoryConnection.java:228) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:455) - at org.openrdf.repository.util.RDFInserter.handleStatement(RDFInserter.java:196) - at org.openrdf.rio.ntriples.NTriplesParser.parseTriple(NTriplesParser.java:260) - at org.openrdf.rio.ntriples.NTriplesParser.parse(NTriplesParser.java:170) - at org.openrdf.rio.ntriples.NTriplesParser.parse(NTriplesParser.java:112) - at org.openrdf.repository.base.RepositoryConnectionBase.addInputStreamOrReader(RepositoryConnectionBase.java:353) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:242) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:239) - at org.openrdf.repository.base.RepositoryConnectionBase.add(RepositoryConnectionBase.java:202) - at benchmark.bigdata.BigdataLoader.loadData(BigdataLoader.java:159) - at benchmark.bigdata.BigdataLoader.main(BigdataLoader.java:109) -Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) - at java.util.concurrent.FutureTask.get(FutureTask.java:83) - at com.bigdata.rdf.spo.SPORelation.insert(SPORelation.java:1807) - ... 20 more -Caused by: java.lang.AssertionError: There are 2 outstanding permits (should be just one). - at com.bigdata.io.WriteCacheService.writeChk(WriteCacheService.java:834) - at com.bigdata.rwstore.RWStore.alloc(RWStore.java:854) - at com.bigdata.journal.RWStrategy.write(RWStrategy.java:201) - at com.bigdata.journal.AbstractJournal.write(AbstractJournal.java:2498) - at com.bigdata.btree.AbstractBTree.writeNodeOrLeaf(AbstractBTree.java:3664) - at com.bigdata.btree.AbstractBTree.writeNodeRecursive(AbstractBTree.java:3477) - at com.bigdata.btree.DefaultEvictionListener.evicted(DefaultEvictionListener.java:102) - at com.bigdata.btree.DefaultEvictionListener.evicted(DefaultEvictionListener.java:1) - at com.bigdata.cache.HardReferenceQueue.evict(HardReferenceQueue.java:226) - at com.bigdata.cache.HardReferenceQueue.beforeOffer(HardReferenceQueue.java:199) - at com.bigdata.cache.RingBuffer.add(RingBuffer.java:159) - at com.bigdata.cache.HardReferenceQueue.add(HardReferenceQueue.java:176) - at com.bigdata.btree.AbstractBTree.doTouch(AbstractBTree.java:3365) - at com.bigdata.btree.AbstractBTree.touch(AbstractBTree.java:3331) - at com.bigdata.btree.AbstractNode.<init>(AbstractNode.java:297) - at com.bigdata.btree.AbstractNode.<init>(AbstractNode.java:333) - at com.bigdata.btree.Leaf.<init>(Leaf.java:345) - at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:492) - at com.bigdata.btree.AbstractNode.copyOnWrite(AbstractNode.java:417) - at com.bigdata.btree.Leaf.insert(Leaf.java:490) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.Node.insert(Node.java:900) - at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:2006) - at com.bigdata.btree.AbstractBTree.insert(AbstractBTree.java:1950) - at com.bigdata.rdf.spo.SPOIndexWriteProc.apply(SPOIndexWriteProc.java:247) - at com.bigdata.btree.UnisolatedReadWriteIndex.submit(UnisolatedReadWriteIndex.java:796) - at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:329) - at com.bigdata.rdf.spo.SPOIndexWriter.call(SPOIndexWriter.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) -ERROR: 37844 com.bigdata.rwstore.RWWriteCacheService1 com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:307): java.nio.channels.ClosedByInterruptException -java.nio.channels.ClosedByInterruptException - at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) - at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:653) - at com.bigdata.io.FileChannelUtility.writeAll(FileChannelUtility.java:402) - at com.bigdata.io.WriteCache$FileChannelScatteredWriteCache.writeOnChannel(WriteCache.java:1313) - at com.bigdata.io.WriteCache.flushAndReset(WriteCache.java:745) - at com.bigdata.io.WriteCache.flush(WriteCache.java:658) - at com.bigdata.io.WriteCache.flush(WriteCache.java:604) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:285) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) -ERROR: 37844 com.bigdata.rwstore.RWWriteCacheService1 com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:307): java.lang.InterruptedException: sleep interrupted -java.lang.InterruptedException: sleep interrupted - at java.lang.Thread.sleep(Native Method) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:271) - at com.bigdata.io.WriteCacheService$WriteTask.call(WriteCacheService.java:1) - at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) - at java.util.concurrent.FutureTask.run(FutureTask.java:138) - at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) - at java.lang.Thread.run(Thread.java:619) Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestAll.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,74 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Runs all tests for all journal implementations. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class TestAll extends TestCase { - - /** - * - */ - public TestAll() { - } - - /** - * @param arg0 - */ - public TestAll(String arg0) { - super(arg0); - } - - /** - * Returns a test that will run each of the implementation specific test - * suites in turn. - */ - public static Test suite() - { - - final TestSuite suite = new TestSuite("journal/HA"); - - // HA test suite for the WORM strategy. - suite.addTest(TestHAWORMStrategy.suite()); - - // @todo HA test suite for the RW strategy. -// suite.addTest(TestHARWStrategy.suite()); - - return suite; - - } - -} Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWORMStrategy.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,378 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2007. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -*/ -/* - * Created on Oct 14, 2006 - */ - -package com.bigdata.journal.ha; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; - -import junit.extensions.proxy.ProxyTestSuite; -import junit.framework.Test; - -import com.bigdata.io.DirectBufferPool; -import com.bigdata.io.writecache.WriteCacheService; -import com.bigdata.journal.AbstractInterruptsTestCase; -import com.bigdata.journal.AbstractMRMWTestCase; -import com.bigdata.journal.AbstractMROWTestCase; -import com.bigdata.journal.AbstractRestartSafeTestCase; -import com.bigdata.journal.BufferMode; -import com.bigdata.journal.Journal; -import com.bigdata.journal.Options; -import com.bigdata.journal.TestJournalBasics; -import com.bigdata.journal.WORMStrategy; -import com.bigdata.rawstore.IRawStore; - -/** - * Test suite for highly available {@link WORMStrategy} journals. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class TestHAWORMStrategy extends AbstractHAJournalTestCase { - - public TestHAWORMStrategy() { - super(); - } - - public TestHAWORMStrategy(String name) { - super(name); - } - - public static Test suite() { - - final TestHAWORMStrategy delegate = new TestHAWORMStrategy(); // !!!! THIS CLASS !!!! - - /* - * Use a proxy test suite and specify the delegate. - */ - - final ProxyTestSuite suite = new ProxyTestSuite(delegate, - "WORM HA Journal Test Suite"); - - /* - * List any non-proxied tests (typically bootstrapping tests). - */ - -// /* -// * HA bootstrap test (non-proxied). -// */ -// suite.addTestSuite(TestHAJournalBootstrap.class); - - /* - * Proxied test suites. - */ - - // tests defined by this class. - suite.addTestSuite(TestHAWORMStrategy.class); - - // test suite for the IRawStore api. - suite.addTestSuite(TestRawStore.class); - - // test suite for handling asynchronous close of the file channel. - suite.addTestSuite(TestInterrupts.class); - - // test suite for MROW correctness. - suite.addTestSuite(TestMROW.class); - - // test suite for MRMW correctness. - suite.addTestSuite(TestMRMW.class); - - /* - * Pickup the basic journal test suite. This is a proxied test suite, so - * all the tests will run with the configuration specified in this test - * class and its optional .properties file. - */ - suite.addTest(TestJournalBasics.suite()); - - /* - * Pickup the HA journal test suite. - */ - suite.addTest(TestJournalHA.suite()); - - return suite; - - } - - public Properties getProperties() { - - final Properties properties = super.getProperties(); - - properties.setProperty(Journal.Options.COLLECT_PLATFORM_STATISTICS, - "false"); - - properties.setProperty(Journal.Options.COLLECT_QUEUE_STATISTICS, - "false"); - - properties.setProperty(Journal.Options.HTTPD_PORT, "-1"/* none */); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM.toString()); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return properties; - - } - - /** - * Verify normal operation and basic assumptions when creating a new journal - * using {@link BufferMode#DiskWORM}. - * - * @throws IOException - */ - public void test_create_disk01() throws IOException { - - final Properties properties = getProperties(); - - final Journal journal = new Journal(properties); - - try { - - final WORMStrategy bufferStrategy = (WORMStrategy) journal - .getBufferStrategy(); - - assertTrue("isStable", bufferStrategy.isStable()); - assertFalse("isFullyBuffered", bufferStrategy.isFullyBuffered()); - // assertEquals(Options.FILE, properties.getProperty(Options.FILE), - // bufferStrategy.file.toString()); - assertEquals(Options.INITIAL_EXTENT, Long - .parseLong(Options.DEFAULT_INITIAL_EXTENT), bufferStrategy - .getInitialExtent()); - assertEquals(Options.MAXIMUM_EXTENT, - 0L/* soft limit for disk mode */, bufferStrategy - .getMaximumExtent()); - assertNotNull("raf", bufferStrategy.getRandomAccessFile()); - assertEquals(Options.BUFFER_MODE, BufferMode.DiskWORM, bufferStrategy - .getBufferMode()); - - } finally { - - journal.destroy(); - - } - - } - - /** - * Unit test verifies that {@link Options#CREATE} may be used to initialize - * a journal on a newly created empty file. - * - * @throws IOException - */ - public void test_create_emptyFile() throws IOException { - - final File file = File.createTempFile(getName(), Options.JNL); - - final Properties properties = new Properties(); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM.toString()); - - properties.setProperty(Options.FILE, file.toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - final Journal journal = new Journal(properties); - - try { - - assertEquals(file, journal.getFile()); - - } finally { - - journal.destroy(); - - } - - } - - /** - * Test suite integration for {@link AbstractRestartSafeTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestRawStore extends AbstractRestartSafeTestCase { - - public TestRawStore() { - super(); - } - - public TestRawStore(String name) { - super(name); - } - - protected BufferMode getBufferMode() { - - return BufferMode.DiskWORM; - - } - - } - - /** - * Test suite integration for {@link AbstractInterruptsTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestInterrupts extends AbstractInterruptsTestCase { - - public TestInterrupts() { - super(); - } - - public TestInterrupts(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Test suite integration for {@link AbstractMROWTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestMROW extends AbstractMROWTestCase { - - public TestMROW() { - super(); - } - - public TestMROW(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Test suite integration for {@link AbstractMRMWTestCase}. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ - public static class TestMRMW extends AbstractMRMWTestCase { - - public TestMRMW() { - super(); - } - - public TestMRMW(String name) { - super(name); - } - - protected IRawStore getStore() { - - final Properties properties = getProperties(); - - properties.setProperty(Options.CREATE_TEMP_FILE, "true"); - - properties.setProperty(Options.DELETE_ON_EXIT, "true"); - - properties.setProperty(Options.BUFFER_MODE, BufferMode.DiskWORM - .toString()); - - properties.setProperty(Options.WRITE_CACHE_ENABLED, "" - + writeCacheEnabled); - - /* - * The following two properties are dialed way down in order to - * raise the probability that we will observe the following error - * during this test. - * - * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6371642 - * - * FIXME We should make the MRMW test harder and focus on - * interleaving concurrent extensions of the backing store for both - * WORM and R/W stores. - */ - - // Note: Use a relatively small initial extent. - properties.setProperty(Options.INITIAL_EXTENT, "" - + DirectBufferPool.INSTANCE.getBufferCapacity() * 1); - - // Note: Use a relatively small extension each time. - properties.setProperty(Options.MINIMUM_EXTENSION, - "" + (long) (DirectBufferPool.INSTANCE - .getBufferCapacity() * 1.1)); - - return new Journal(properties);//.getBufferStrategy(); - - } - - } - - /** - * Note: HA requires the use of the write cache. It is the - * {@link WriteCacheService} which provides the write replication mechanism - * for HA. - */ - private static final boolean writeCacheEnabled = true; - -} Deleted: branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java =================================================================== --- branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java 2013-12-09 20:53:38 UTC (rev 7629) +++ branches/MGC_1_3_0/bigdata/src/test/com/bigdata/journal/ha/TestHAWritePipeline.java 2013-12-10 21:36:50 UTC (rev 7630) @@ -1,370 +0,0 @@ -/** - -Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a cop... [truncated message content] |