From: <tho...@us...> - 2010-10-20 18:30:33
|
Revision: 3830 http://bigdata.svn.sourceforge.net/bigdata/?rev=3830&view=rev Author: thompsonbry Date: 2010-10-20 18:30:25 +0000 (Wed, 20 Oct 2010) Log Message: ----------- Added a MultiplexBlockingBuffer. This is a factory pattern which may be used to share the same backing BlockingBuffer among many producers. Each producer receives a skin for the backing buffer. The backing buffer is only closed once each producer closes their skin. Added an IMultiSourceAsynchronousIterator interface for an IAsynchronousIterator which can consume multiple sources. There is one implementation in this commit, which allows the producer to attach another source. This is used to assign a chunk to a task which is already running. There is another version which handles multiple concurrent producers, but it's implementation is not yet finished. Modified Paths: -------------- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestAll.java Added Paths: ----------- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/IMultiSourceAsynchronousIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialAsynchronousIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiSourceSequentialAsynchronousIterator.java branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiplexBlockingBuffer.java Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/IMultiSourceAsynchronousIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/IMultiSourceAsynchronousIterator.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/IMultiSourceAsynchronousIterator.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -0,0 +1,53 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 19, 2010 + */ + +package com.bigdata.relation.accesspath; + +/** + * An interface which permits new sources to be attached dynamically. The + * decision to accept a new source via {@link #add(IAsynchronousIterator)} or to + * {@link IMultiSourceAsynchronousIterator#close()} the iterator must be atomic. + * In particular, it is illegal for a source to be accepted after the iterator + * has been closed. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public interface IMultiSourceAsynchronousIterator<E> extends + IAsynchronousIterator<E> { + + /** + * Add a source. If the iterator already reports that it is closed then the + * new source can not be added and this method will return false. + * + * @param src + * The source. + * @return <code>true</code> iff the source could be added. + */ + boolean add(IAsynchronousIterator<E> src); + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/IMultiSourceAsynchronousIterator.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialAsynchronousIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialAsynchronousIterator.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialAsynchronousIterator.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -0,0 +1,186 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 19, 2010 + */ + +package com.bigdata.relation.accesspath; + +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * Class allows new sources to be attached dynamically. If the existing sources + * are drained then the iterator will {@link #close()} itself so that new + * sources can no longer be attached. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class MultiSourceSequentialAsynchronousIterator<E> implements + IMultiSourceAsynchronousIterator<E> { + + private final ReentrantLock lock = new ReentrantLock(); + + private final Queue<IAsynchronousIterator<E>> sources = new LinkedBlockingQueue<IAsynchronousIterator<E>>(); + + /** + * The current inner iterator. When <code>null</code> the outer iterator has + * been closed and will not deliver any more results and will not accept any + * new sources. + * <p> + * Note: This can be asynchronously closed if the application invokes + * {@link #close()}. Methods which test on this can not assume that it will + * be non-<code>null</code> the next time they check unless they are holding + * the {@link #lock}. Methods which do not obtain the lock can offer a + * weaker atomicity by copying the reference to a local variable and then + * testing that variable. + */ + private volatile IAsynchronousIterator<E> current; + + public MultiSourceSequentialAsynchronousIterator(final IAsynchronousIterator<E> src) { + current = src; + } + + public void close() { + lock.lock(); + try { + current = null; + sources.clear(); + } finally { + lock.unlock(); + } + } + + public boolean add(final IAsynchronousIterator<E> src) { + if (src == null) + throw new IllegalArgumentException(); + lock.lock(); + try { + if (current == null) + return false; + sources.add(src); + return true; + } finally { + lock.unlock(); + } + } + + /** + * If the current source is not exhausted, then return it immediately. + * Otherwise, return the next source which is not exhausted. If no such + * sources are available, then {@link #close()} the iterator. The decision + * to accept another source or to close the iterator is made atomic by the + * use of the {@link #lock} in this method and in {@link #close()}. + * + * @return The next source -or- <code>null</code> if there are no sources + * available. + */ + private IAsynchronousIterator<E> nextSource() { + final IAsynchronousIterator<E> tmp = current; + if (tmp == null) + return null; + if (!tmp.isExhausted()) + return current; // Note: MAY be asynchronously cleared! + // current is known to be [null]. + lock.lock(); + try { + // remove the head of the queue (non-blocking) + while ((current = sources.poll()) != null) { + if (!current.isExhausted()) + return current; + } + // no more sources with data, close while holding lock. + close(); + return null; + } finally { + lock.unlock(); + } + } + + public boolean hasNext() { + while (true) { + final IAsynchronousIterator<E> tmp = nextSource(); + if (tmp == null) + return false; + if (tmp.hasNext()) + return true; + } + } + + public E next() { + while (true) { + final IAsynchronousIterator<E> tmp = nextSource(); + if (tmp == null) + throw new NoSuchElementException(); + if (tmp.hasNext()) + return tmp.next(); + } + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + public boolean isExhausted() { + return nextSource() == null; + } + + public boolean hasNext(final long timeout, final TimeUnit unit) + throws InterruptedException { + final long begin = System.nanoTime(); + final long nanos = unit.toNanos(timeout); + long remaining = nanos; + while (remaining > 0) { + final IAsynchronousIterator<E> tmp = nextSource(); + if (tmp == null) + return false; + if (tmp.hasNext(remaining, TimeUnit.NANOSECONDS)) + return true; + remaining = nanos - (System.nanoTime() - begin); + } + // timeout. + return false; + } + + public E next(final long timeout, final TimeUnit unit) + throws InterruptedException { + final long begin = System.nanoTime(); + final long nanos = unit.toNanos(timeout); + long remaining = nanos; + while (true) { + final IAsynchronousIterator<E> tmp = nextSource(); + if (tmp == null) + return null; + if (tmp.hasNext(remaining, TimeUnit.NANOSECONDS)) + return tmp.next(); + remaining = nanos - (System.nanoTime() - begin); + } + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiSourceSequentialAsynchronousIterator.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -0,0 +1,212 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 7, 2010 + */ + +package com.bigdata.relation.accesspath; + +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * A factory for skins which may be used to multiplex writes against a + * {@link BlockingBuffer}. Each skin writes through to the backing + * {@link BlockingBuffer} but may be closed independently of the backing + * {@link BlockingBuffer}. This allows multiple produces to share a single + * {@link BlockingBuffer} as long as they use a subset of the + * {@link IBlockingBuffer} API (they can not set the {@link Future} on the + * objects returned by this factory or obtain its + * {@link IBlockingBuffer#iterator()}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + * + * @todo Does this need to close automatically when the last open inner buffer + * is closed or should it be closed explicitly and close all inner buffers + * when it is closed? + */ +public class MultiplexBlockingBuffer<E> { + + /** The delegate. */ + private final IBlockingBuffer<E> b; + + /** Lock guarding internal state. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** The set of opened buffered which have not yet been closed. */ + private final LinkedHashSet<IBlockingBuffer<E>> set = new LinkedHashSet<IBlockingBuffer<E>>(); + + /** The #of currently open buffers. */ + private int counter = 0; + + public MultiplexBlockingBuffer(final IBlockingBuffer<E> b) { + if (b == null) + throw new IllegalArgumentException(); + this.b = b; + } + + public boolean isOpen() { + return b.isOpen(); + } + + public IBlockingBuffer<E> newInstance() { + lock.lock(); + try { + if(!isOpen())// ??? + throw new BufferClosedException(); + final IBlockingBuffer<E> n = new InnerBlockingBuffer(); + if (!set.add(n)) + throw new AssertionError(); + counter++; + return n; + } finally { + lock.unlock(); + } + } + + public void flushAndCloseAll() { + lock.lock(); + try { + final Iterator<IBlockingBuffer<E>> itr = set.iterator(); + while(itr.hasNext()) { + final IBlockingBuffer<E> n = itr.next(); + n.close(); + } + assert counter == 0; + b.flush(); + b.close(); + } finally { + lock.unlock(); + } + } + + /** + * The {@link IBlockingBuffer} reference provided to the constructor. + */ + public IBlockingBuffer<E> getBackingBuffer() { + return b; + } + + /** + * Inner "skin" writes through to the backing buffer shared by all skins. + * <p> + * Note: This inner class does not support several of the + * {@link IBlockingBuffer} methods whose semantics are likely to cause + * problems when interpreted in the light of a skin over a shared buffer. + * The only way these methods could be given clear semantics is if the skin + * were actually a full {@link BlockingBuffer} which was coupled to the + * shared buffer. However, that involves double buffering and double copying + * and I do not think that this is worth it. + */ + private class InnerBlockingBuffer implements IBlockingBuffer<E> { + + public InnerBlockingBuffer() { + } + + private boolean innerBufferOpen = true; + + public IAsynchronousIterator<E> iterator() { + throw new UnsupportedOperationException(); + } + + public void setFuture(Future future) { + throw new UnsupportedOperationException(); + } + + public void abort(final Throwable cause) { + lock.lock(); + try { + if (!innerBufferOpen) + throw new BufferClosedException(); + b.abort(cause); + } finally { + lock.unlock(); + } + } + + public void close() { + lock.lock(); + try { + if (!innerBufferOpen) + return; + innerBufferOpen = false; + if (!set.remove(this)) + throw new AssertionError(); + counter--; + if (counter == 0) { + /* + * Note: We flush the backing buffer before we close it in + * case it has anything buffered. This covers the normal, + * which is where the caller has already invoked flush() on + * this skin and should not create any harm otherwise. + */ + b.flush(); + b.close(); + } + } finally { + lock.unlock(); + } + } + + public Future getFuture() { + return b.getFuture(); + } + + public boolean isOpen() { + return innerBufferOpen && b.isOpen(); + } + + public long flush() { + /* + * Nothing to flush. The target is flushed when the outer class is + * closed. + */ + return 0; + } + + public void add(E e) { + if (!innerBufferOpen) + throw new BufferClosedException(); + b.add(e); + } + + public boolean isEmpty() { + return b.isEmpty(); + } + + public void reset() { + throw new UnsupportedOperationException(); + } + + public int size() { + return b.size(); + } + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/java/com/bigdata/relation/accesspath/MultiplexBlockingBuffer.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Modified: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestAll.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestAll.java 2010-10-20 18:27:22 UTC (rev 3829) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestAll.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -77,11 +77,17 @@ suite.addTestSuite(TestBlockingBufferWithChunks.class); suite.addTestSuite(TestBlockingBufferWithChunksDeque.class); - + suite.addTestSuite(TestUnsynchronizedArrayBuffer.class); suite.addTestSuite(TestUnsynchronizedUnboundedChunkBuffer.class); + suite.addTestSuite(TestMultiplexBlockingBuffer.class); + + suite.addTestSuite(TestMultiSourceSequentialAsynchronousIterator.class); + + //suite.addTestSuite(TestMultiSourceParallelAsynchronousIterator.class); + return suite; } Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiSourceSequentialAsynchronousIterator.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiSourceSequentialAsynchronousIterator.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiSourceSequentialAsynchronousIterator.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -0,0 +1,171 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 6, 2010 + */ + +package com.bigdata.relation.accesspath; + +import java.util.concurrent.TimeUnit; + +import com.bigdata.relation.accesspath.IAsynchronousIterator; +import com.bigdata.relation.accesspath.ThickAsynchronousIterator; + +import junit.framework.TestCase2; + +/** + * Test suite for the {@link MultiSourceSequentialAsynchronousIterator}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestMultiSourceSequentialAsynchronousIterator extends TestCase2 { + + public TestMultiSourceSequentialAsynchronousIterator() { + + } + + public TestMultiSourceSequentialAsynchronousIterator(String name) { + super(name); + } + + private final IAsynchronousIterator<String> emptyIterator() { + return new ThickAsynchronousIterator<String>(new String[]{}); + } + + public void test1() throws InterruptedException { + + // empty iterator. + final MultiSourceSequentialAsynchronousIterator<String> itr = new MultiSourceSequentialAsynchronousIterator<String>( + emptyIterator()); + +// // nothing available yet. +// assertFalse(itr.hasNext(1, TimeUnit.MILLISECONDS)); +// assertNull(itr.next(1, TimeUnit.MILLISECONDS)); + + // add an empty chunk. + assertTrue(itr.add(new ThickAsynchronousIterator<String>( + new String[] {}))); + +// // still nothing available yet. +// assertFalse(itr.hasNext(1, TimeUnit.MILLISECONDS)); +// assertNull(itr.next(1, TimeUnit.MILLISECONDS)); + + // add a non-empty chunk. + assertTrue(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "a" }))); + + // reports data available and visits data. + assertTrue(itr.hasNext(1, TimeUnit.MILLISECONDS)); + assertEquals("a", itr.next(1, TimeUnit.MILLISECONDS)); + + // add a non-empty chunk. + assertTrue(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "b" }))); + + // reports data available and visits data. + assertTrue(itr.hasNext()); + assertEquals("b", itr.next()); + + // close the iterator. + itr.close(); + + // iterator reports nothing available. + assertFalse(itr.hasNext()); + assertFalse(itr.hasNext(1, TimeUnit.MILLISECONDS)); + assertNull(itr.next(1, TimeUnit.MILLISECONDS)); + + // can not add more sources. + assertFalse(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "b" }))); + + } + + public void test2() throws InterruptedException { + + // empty iterator. + final MultiSourceSequentialAsynchronousIterator<String> itr = new MultiSourceSequentialAsynchronousIterator<String>( + emptyIterator()); + + // add a non-empty chunk. + assertTrue(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "a" }))); + + // add a non-empty chunk. + assertTrue(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "b" }))); + + // reports data available and visits data. + assertTrue(itr.hasNext()); + assertEquals("a", itr.next()); + assertTrue(itr.hasNext()); + assertEquals("b", itr.next()); + + // another read on the iterator causes it to be closed. + assertFalse(itr.hasNext()); + + // can not add more sources. + assertFalse(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "b" }))); + + } + + /** + * Verify that the iterator notices if it is asynchronously closed. + * + * @throws InterruptedException + */ + public void test3() throws InterruptedException { + + // empty iterator. + final MultiSourceSequentialAsynchronousIterator<String> itr = new MultiSourceSequentialAsynchronousIterator<String>( + emptyIterator()); + + new Thread() { + + public void run() { + try { + log.info("Will wait on iterator."); + if (itr.hasNext(2000, TimeUnit.MILLISECONDS)) + fail("Iterator should not visit anything."); + } catch (Throwable t) { + log.error(t, t); + } + } + + }.start(); + + log.info("Sleeping..."); + Thread.sleep(500/*milliseconds.*/); + + log.info("Will close iterator."); + itr.close(); + + // can not add more sources. + assertFalse(itr.add(new ThickAsynchronousIterator<String>( + new String[] { "b" }))); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiSourceSequentialAsynchronousIterator.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL Added: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiplexBlockingBuffer.java =================================================================== --- branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiplexBlockingBuffer.java (rev 0) +++ branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiplexBlockingBuffer.java 2010-10-20 18:30:25 UTC (rev 3830) @@ -0,0 +1,127 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2010. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +/* + * Created on Oct 8, 2010 + */ + +package com.bigdata.relation.accesspath; + +import junit.framework.TestCase2; + +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.relation.accesspath.BufferClosedException; +import com.bigdata.relation.accesspath.IBlockingBuffer; + +/** + * Test suite for {@link MultiplexBlockingBuffer}. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestMultiplexBlockingBuffer extends TestCase2 { + + /** + * + */ + public TestMultiplexBlockingBuffer() { + + } + + /** + * @param name + */ + public TestMultiplexBlockingBuffer(String name) { + super(name); + + } + + public void test_multiplex() { + + final IBlockingBuffer<String> buffer = new BlockingBuffer<String>(); + + final MultiplexBlockingBuffer<String> multiplex = new MultiplexBlockingBuffer<String>(buffer); + + // buffer is open and empty. + assertTrue(buffer.isOpen()); + assertTrue(buffer.isEmpty()); + + // multiplex is open. + assertTrue(multiplex.isOpen()); + + final IBlockingBuffer<String> skin1 = multiplex.newInstance(); + + final IBlockingBuffer<String> skin2 = multiplex.newInstance(); + + // buffer is open and empty. + assertTrue(buffer.isOpen()); + assertTrue(buffer.isEmpty()); + + // multiplex is open. + assertTrue(multiplex.isOpen()); + + skin1.add("a"); + skin1.flush(); + skin1.close(); + try { + skin1.add("a2"); + fail("Expecting: " + BufferClosedException.class); + } catch (BufferClosedException ex) { + if (log.isInfoEnabled()) + log.info("Ignoring expected exception: " + ex); + } + + // buffer is open but no longer empty. + assertTrue(buffer.isOpen()); + assertFalse(buffer.isEmpty()); + + // multiplex is open. + assertTrue(multiplex.isOpen()); + + skin2.add("b"); + skin2.add("c"); + skin2.flush(); + + // buffer is open but not empty. + assertTrue(buffer.isOpen()); + assertFalse(buffer.isEmpty()); + + // multiplex is open. + assertTrue(multiplex.isOpen()); + + // close the last open skin. + skin2.close(); + + // buffer is closed but not empty. + assertFalse(buffer.isOpen()); + assertFalse(buffer.isEmpty()); + + // multiplex closed. + assertFalse(multiplex.isOpen()); + + // verify the data. + assertSameIterator(new String[]{"a","b","c"}, buffer.iterator()); + + } + +} Property changes on: branches/QUADS_QUERY_BRANCH/bigdata/src/test/com/bigdata/relation/accesspath/TestMultiplexBlockingBuffer.java ___________________________________________________________________ Added: svn:keywords + Id Date Revision Author HeadURL This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |