This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <mar...@us...> - 2014-04-03 09:48:22
|
Revision: 8032 http://sourceforge.net/p/bigdata/code/8032 Author: martyncutcher Date: 2014-04-03 09:48:19 +0000 (Thu, 03 Apr 2014) Log Message: ----------- Commit to allow branch to be added to CI. Note that this includes a delay added to AbstractJournal.gatherPhase() to support HA1 and which must be removed once the cause of its necessity is identified. Modified Paths: -------------- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -310,7 +310,7 @@ /** * When a record is used as a read cache then the readCount is - * maintained as a metric on its access. �This could be used to + * maintained as a metric on its access. ???This could be used to * determine eviction/compaction. * <p> * Note: volatile to guarantee visibility of updates. Might do better @@ -509,7 +509,8 @@ * @param isHighlyAvailable * when <code>true</code> the whole record checksum is maintained * for use when replicating the write cache along the write - * pipeline. + * pipeline. This needs to be <code>true</code> for HA1 as well + * since we need to write the HALog. * @param bufferHasData * when <code>true</code> the caller asserts that the buffer has * data (from a replicated write), in which case the position Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -1151,6 +1151,7 @@ done = WriteCache.transferTo(cache/* src */, curCompactingCache/* dst */, serviceMap, 0/*threshold*/); if (done) { + // Everything was compacted. Send just the address metadata (empty cache block). sendAddressMetadata(cache); if (log.isDebugEnabled()) @@ -1164,7 +1165,7 @@ */ if (flush) { /* - * Send out the full cache block. + * Send out the full cache block. FIXME Why are we not calling sendAddressMetadata() here? */ writeCacheBlock(curCompactingCache); addClean(curCompactingCache, true/* addFirst */); @@ -1231,7 +1232,7 @@ * been allocated on the leader in the same order in which the leader * made those allocations. This information is used to infer the order * in which the allocators for the different allocation slot sizes are - * created. This method will synchronous send those address notices and + * created. This method will synchronously send those address notices and * and also makes sure that the followers see the recycled addresses * records so they can keep both their allocators and the actual * allocations synchronized with the leader. @@ -1249,8 +1250,9 @@ throws IllegalStateException, InterruptedException, ExecutionException, IOException { - if (quorum == null || !quorum.isHighlyAvailable() - || !quorum.getClient().isLeader(quorumToken)) { +// if (quorum == null || !quorum.isHighlyAvailable() +// || !quorum.getClient().isLeader(quorumToken)) { + if (quorum == null) { return; } @@ -1354,7 +1356,7 @@ * unit tests need to be updated to specify [isHighlyAvailable] for * ALL quorum based test runs. */ - final boolean isHA = quorum != null && quorum.isHighlyAvailable(); + final boolean isHA = quorum != null; // IFF HA and this is the quorum leader. final boolean isHALeader = isHA @@ -1441,10 +1443,12 @@ quorumMember.logWriteCacheBlock(pkg.getMessage(), pkg.getData().duplicate()); // ASYNC MSG RMI + NIO XFER. - remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(), - pkg.getData().duplicate()); - - counters.get().nsend++; + if (quorum.replicationFactor() > 1) { + remoteWriteFuture = quorumMember.replicate(null/* req */, pkg.getMessage(), + pkg.getData().duplicate()); + + counters.get().nsend++; + } /* * The quorum leader logs the write cache block here. For the Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -2473,18 +2473,18 @@ } - /** - * Return <code>true</code> if the journal is configured for high - * availability. - * - * @see QuorumManager#isHighlyAvailable() - */ - public boolean isHighlyAvailable() { +// /** +// * Return <code>true</code> if the journal is configured for high +// * availability. +// * +// * @see Quorum#isHighlyAvailable() +// */ +// public boolean isHighlyAvailable() { +// +// return quorum == null ? false : quorum.isHighlyAvailable(); +// +// } - return quorum == null ? false : quorum.isHighlyAvailable(); - - } - /** * {@inheritDoc} * <p> @@ -3428,8 +3428,16 @@ if (quorum == null) return; -// if (!quorum.isHighlyAvailable()) -// return; + if (!quorum.isHighlyAvailable()) { + // FIXME: Find the reason why this delay is needed and remove it! + // + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return; + } /** * CRITICAL SECTION. We need obtain a distributed consensus for the @@ -3542,6 +3550,19 @@ // reload the commit record from the new root block. store._commitRecord = store._getCommitRecord(); + if (quorum != null) { + /* + * Write the root block on the HALog file, closing out that + * file. + */ + final QuorumService<HAGlue> localService = quorum.getClient(); + try { + localService.logRootBlock(newRootBlock); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (txLog.isInfoEnabled()) txLog.info("COMMIT: commitTime=" + commitTime); @@ -3792,7 +3813,7 @@ if (log.isInfoEnabled()) log.info("commitTime=" + commitTime); - final CommitState cs = new CommitState(this, commitTime); + final CommitState cs = new CommitState(this, commitTime); /* * Flush application data, decide whether or not the store is dirty, @@ -3808,6 +3829,7 @@ } // Do GATHER (iff HA). + cs.gatherPhase(); /* @@ -3846,12 +3868,12 @@ // Prepare the new root block. cs.newRootBlock(); - if (quorum == null) { + if (quorum == null || quorum.replicationFactor() == 1) { // Non-HA mode. cs.commitSimple(); - } else { + } else { // HA mode commit (2-phase commit). cs.commitHA(); Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/RWStrategy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -146,6 +146,7 @@ } + @Override public ByteBuffer read(final long addr) { try { Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/WORMStrategy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -272,7 +272,7 @@ * which use this flag to conditionally track the checksum of the entire * write cache buffer). */ - private final boolean isHighlyAvailable; + private final boolean isQuorumUsed; /** * The {@link UUID} which identifies the journal (this is the same for each @@ -970,11 +970,11 @@ com.bigdata.journal.Options.HALOG_COMPRESSOR, com.bigdata.journal.Options.DEFAULT_HALOG_COMPRESSOR); - isHighlyAvailable = quorum != null && quorum.isHighlyAvailable(); + isQuorumUsed = quorum != null; // && quorum.isHighlyAvailable(); final boolean useWriteCacheService = fileMetadata.writeCacheEnabled && !fileMetadata.readOnly && fileMetadata.closeTime == 0L - || isHighlyAvailable; + || isQuorumUsed; if (useWriteCacheService) { /* @@ -1049,7 +1049,7 @@ final long fileExtent) throws InterruptedException { - super(baseOffset, buf, useChecksum, isHighlyAvailable, + super(baseOffset, buf, useChecksum, isQuorumUsed, bufferHasData, opener, fileExtent); } @@ -1379,6 +1379,7 @@ * to get the data from another node based on past experience for that * record. */ + @Override public ByteBuffer read(final long addr) { try { Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWStore.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -688,7 +688,7 @@ throws InterruptedException { super(buf, useChecksum, m_quorum != null - && m_quorum.isHighlyAvailable(), bufferHasData, opener, + /*&& m_quorum.isHighlyAvailable()*/, bufferHasData, opener, fileExtent, m_bufferedWrite); @@ -1083,7 +1083,7 @@ final boolean highlyAvailable = m_quorum != null && m_quorum.isHighlyAvailable(); - final boolean prefixWrites = highlyAvailable; + final boolean prefixWrites = m_quorum != null; // highlyAvailable return new RWWriteCacheService(m_writeCacheBufferCount, m_minCleanListSize, m_readCacheBufferCount, prefixWrites, m_compactionThreshold, m_hotCacheSize, m_hotCacheThreshold, Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/rwstore/RWWriteCacheService.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -80,8 +80,9 @@ final long fileExtent) throws InterruptedException { - final boolean highlyAvailable = getQuorum() != null - && getQuorum().isHighlyAvailable(); +// final boolean highlyAvailable = getQuorum() != null +// && getQuorum().isHighlyAvailable(); + final boolean highlyAvailable = getQuorum() != null; return new FileChannelScatteredWriteCache(buf, true/* useChecksum */, highlyAvailable, Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3BackupTestCase.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -184,7 +184,7 @@ // Verify journal can be dumped without error. dumpJournal(jnl); - + /* * Now roll that journal forward using the HALog directory. */ Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -56,7 +56,7 @@ "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)", // "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", - "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + // "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", }; Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/SD.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -36,8 +36,11 @@ import org.openrdf.model.impl.URIImpl; import org.openrdf.model.vocabulary.RDF; +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IIndexManager; +import com.bigdata.quorum.Quorum; import com.bigdata.rdf.axioms.Axioms; import com.bigdata.rdf.axioms.NoAxioms; import com.bigdata.rdf.axioms.OwlAxioms; @@ -641,7 +644,10 @@ final AbstractJournal jnl = (AbstractJournal) indexManager; - if (jnl.isHighlyAvailable()) { + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = jnl + .getQuorum(); + + if (quorum != null && quorum.isHighlyAvailable()) { g.add(aService, SD.feature, HighlyAvailable); Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 00:41:27 UTC (rev 8031) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-04-03 09:48:19 UTC (rev 8032) @@ -55,10 +55,13 @@ import com.bigdata.bop.engine.QueryLog; import com.bigdata.bop.fed.QueryEngineFactory; import com.bigdata.counters.CounterSet; +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.QuorumService; import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.DumpJournal; import com.bigdata.journal.IIndexManager; import com.bigdata.journal.Journal; +import com.bigdata.quorum.Quorum; import com.bigdata.rdf.sail.sparql.ast.SimpleNode; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.AbstractQueryTask; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.RunningQuery; @@ -497,13 +500,18 @@ // final boolean showQuorum = req.getParameter(SHOW_QUORUM) != null; - if (getIndexManager() instanceof AbstractJournal - && ((AbstractJournal) getIndexManager()) - .isHighlyAvailable()) { + if (getIndexManager() instanceof AbstractJournal) { - new HAStatusServletUtil(getIndexManager()). - doGet(req, resp, current); + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = ((AbstractJournal) getIndexManager()) + .getQuorum(); + if (quorum != null && quorum.isHighlyAvailable()) { + + new HAStatusServletUtil(getIndexManager()).doGet(req, resp, + current); + + } + } current.node("br", "Accepted query count=" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tob...@us...> - 2014-04-03 00:41:29
|
Revision: 8031 http://sourceforge.net/p/bigdata/code/8031 Author: tobycraig Date: 2014-04-03 00:41:27 +0000 (Thu, 03 Apr 2014) Log Message: ----------- #858 - Workbench URL can now include namespace and URI to begin exploration at Modified Paths: -------------- branches/RDR/bigdata-war/src/html/js/workbench.js Modified: branches/RDR/bigdata-war/src/html/js/workbench.js =================================================================== --- branches/RDR/bigdata-war/src/html/js/workbench.js 2014-04-02 22:43:05 UTC (rev 8030) +++ branches/RDR/bigdata-war/src/html/js/workbench.js 2014-04-03 00:41:27 UTC (rev 8031) @@ -25,7 +25,9 @@ $('#' + tab + '-tab').show(); $('#tab-selector a').removeClass(); $('a[data-target=' + tab + ']').addClass('active'); - window.location.hash = tab; + if(window.location.hash.substring(1).indexOf(tab) != 0) { + window.location.hash = tab; + } } function moveTab(next) { @@ -84,9 +86,16 @@ $('.namespace-service-description').click(function(e) { return confirm('This can be an expensive operation. Proceed anyway?'); }); + + READY = true; }); } +function selectNamespace(name) { + // for programmatically selecting a namespace with just its name + $('#namespaces-list li[data-name=' + name + '] a.use-namespace').click(); +} + function useNamespace(name, url) { $('#current-namespace').html(name); NAMESPACE = name; @@ -157,7 +166,7 @@ useNamespace(DEFAULT_NAMESPACE, url); }); } -var DEFAULT_NAMESPACE, NAMESPACE, NAMESPACE_URL, fileContents; +var DEFAULT_NAMESPACE, NAMESPACE, NAMESPACE_URL, READY, fileContents; getDefaultNamespace(); @@ -918,8 +927,38 @@ return $('<div/>').text(text).html(); } +function initialExplore(namespace, uri) { + if(!READY) { + setTimeout(function() { initialExplore(namespace, uri); }, 10); + } else { + if(namespace != '') { + selectNamespace(namespace); + } + explore(uri); + } +} + if(window.location.hash) { - $('a[data-target=' + window.location.hash.substring(1) + ']').click(); + // remove # and see if there is some data to retrieve for this hash + var hash = window.location.hash.substring(1); + var i = hash.indexOf(':'); + if(i != -1) { + var data = hash.substring(i + 1); + hash = hash.substring(0, i); + // currently only the explore tab uses this + // data is in the form namespace:uri + // if no namespace is specified, use the default one + // TODO: this may need to be rethought if we start remembering the namespace the user selects + if(hash == 'explore') { + i = data.indexOf(':'); + var namespace = data.substring(0, i); + var uri = data.substring(i + 1); + + // wait for namespaces to be retrieved + initialExplore(namespace, uri); + } + } + $('a[data-target=' + hash + ']').click(); } else { $('#tab-selector a:first').click(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tob...@us...> - 2014-04-02 22:43:08
|
Revision: 8030 http://sourceforge.net/p/bigdata/code/8030 Author: tobycraig Date: 2014-04-02 22:43:05 +0000 (Wed, 02 Apr 2014) Log Message: ----------- #843 - Show query details upon request and update overall numbers too when a query's details are requested Modified Paths: -------------- branches/RDR/bigdata-war/src/html/css/style.css branches/RDR/bigdata-war/src/html/js/workbench.js Modified: branches/RDR/bigdata-war/src/html/css/style.css =================================================================== --- branches/RDR/bigdata-war/src/html/css/style.css 2014-04-02 20:53:52 UTC (rev 8029) +++ branches/RDR/bigdata-war/src/html/css/style.css 2014-04-02 22:43:05 UTC (rev 8030) @@ -209,5 +209,6 @@ #running-queries div.query-details { border-bottom: 1px solid; + overflow-x: scroll; } Modified: branches/RDR/bigdata-war/src/html/js/workbench.js =================================================================== --- branches/RDR/bigdata-war/src/html/js/workbench.js 2014-04-02 20:53:52 UTC (rev 8029) +++ branches/RDR/bigdata-war/src/html/js/workbench.js 2014-04-02 22:43:05 UTC (rev 8030) @@ -811,7 +811,20 @@ $('#show-queries').click(function(e) { e.preventDefault(); - $.get('/bigdata/status?showQueries', function(data) { + showQueries(false); +}); + +$('#show-query-details').click(function(e) { + e.preventDefault(); + showQueries(true); +}); + +function showQueries(details) { + var url = '/bigdata/status?showQueries'; + if(details) { + url += '=details'; + } + $.get(url, function(data) { // get data inside a jQuery object data = $('<div>').append(data); @@ -822,7 +835,6 @@ $('#running-queries').empty(); data.find('h1').each(function(i, e) { - // per running query, data is structured h1 form (with numbers/cancel data) h2 pre (with SPARQL) e = $(e); // get numbers string, which includes cancel link var form = e.next(); @@ -832,18 +844,25 @@ // get query id var queryId = form.find('input[type=hidden]').val(); // get SPARQL - var sparql = form.next().next().html(); + var sparqlContainer = form.next().next(); + var sparql = sparqlContainer.html(); + if(details) { + var queryDetails = $('<div>').append(sparqlContainer.nextUntil('h1')).html(); + } else { + var queryDetails = '<a href="#">Details</a>'; + } + // got all data, create a li for each query - var li = $('<li><div class="query"><pre>' + sparql + '</pre></div><div class="query-numbers">' + numbers + ', <a href="#" class="cancel-query">Cancel</a></div><div class="query-details"><a href="#" class="query-details collapsed">Details</a></div>'); + var li = $('<li><div class="query"><pre>' + sparql + '</pre></div><div class="query-numbers">' + numbers + ', <a href="#" class="cancel-query">Cancel</a></div><div class="query-details">' + queryDetails + '</div>'); li.find('a').data('queryId', queryId); $('#running-queries').append(li); }); $('.cancel-query').click(cancelQuery); - $('a.query-details').click(getQueryDetails); + $('.query-details a').click(getQueryDetails); }); -}); +} function cancelQuery(e) { e.preventDefault(); @@ -854,8 +873,27 @@ } } -function getQueryDetails(e) {} +function getQueryDetails(e) { + e.preventDefault(); + var id = $(this).data('queryId'); + $.ajax({url: '/bigdata/status?showQueries=details&queryId=' + id, + success: function(data) { + // get data inside a jQuery object + data = $('<div>').append(data); + // update status numbers + getStatusNumbers(data); + + // details begin after second pre + var details = $('<div>').append($(data.find('pre')[1]).nextAll()).html(); + + $(this).parent().html(details); + }, + context: this + }); +} + + /* Performance */ $('#tab-selector a[data-target=performance]').click(function(e) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-02 20:53:55
|
Revision: 8029 http://sourceforge.net/p/bigdata/code/8029 Author: thompsonbry Date: 2014-04-02 20:53:52 +0000 (Wed, 02 Apr 2014) Log Message: ----------- Changed SSSP to use the push-style scatter pattern and added a predecessor that is tracked during the SSSP evaluation. Modified Paths: -------------- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java Modified: branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 16:13:03 UTC (rev 8028) +++ branches/RDR/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2014-04-02 20:53:52 UTC (rev 8029) @@ -16,6 +16,7 @@ package com.bigdata.rdf.graph.analytics; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.openrdf.model.Statement; @@ -40,21 +41,6 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * - * TODO Add parameter for directed versus undirected SSSP. When - * undirected, the gather and scatter are for AllEdges. Otherwise, - * gather on in-edges and scatter on out-edges. Also, we need to use a - * getOtherVertex(e) method to figure out the other edge when using - * undirected scatter/gather. Add unit test for undirected. - * - * FIXME New SSSP (push style scatter abstraction with new test case - * based on graph example developed for this). Note: The push style - * scatter on the GPU is implemented by capturing each (src,edge) pair - * as a distint entry in the frontier. This gives us all of the - * necessary variety. We then reduce that variety, applying the binary - * operator to combine the intermediate results. Finally, an APPLY() - * phase is executed to update the state of the distinct vertices in the - * frontier. - * * FIXME Add a reducer to report the actual minimum length paths. This * is similar to a BFS tree, but the path lengths are not integer values * so we need a different data structure to collect them (we need to @@ -97,17 +83,23 @@ */ private Integer dist = Integer.MAX_VALUE; +// /** +// * Note: This flag is cleared by apply() and then conditionally set +// * iff the {@link #dist()} is replaced by the new value from the +// * gather. Thus, if the gather does not reduce the value, then the +// * propagation of the algorithm is halted. However, this causes the +// * algorithm to NOT scatter for round zero, which causes it to halt. +// * I plan to fix the algorithm by doing the "push" style update in +// * the scatter phase. That will completely remove the gather phase +// * of the algorithm. +// */ +// private boolean changed = false; + /** - * Note: This flag is cleared by apply() and then conditionally set - * iff the {@link #dist()} is replaced by the new value from the - * gather. Thus, if the gather does not reduce the value, then the - * propagation of the algorithm is halted. However, this causes the - * algorithm to NOT scatter for round zero, which causes it to halt. - * I plan to fix the algorithm by doing the "push" style update in - * the scatter phase. That will completely remove the gather phase - * of the algorithm. + * The predecessor is the source vertex to visit a given target vertex + * with the minimum observed distance. */ - private boolean changed = false; + private final AtomicReference<Value> predecessor = new AtomicReference<Value>(); // /** // * Set the distance for the vertex to ZERO. This is done for the @@ -120,15 +112,15 @@ // } // } - /** - * Return <code>true</code> if the {@link #dist()} was updated by the - * last APPLY. - */ - public boolean isChanged() { - synchronized (this) { - return changed; - } - } +// /** +// * Return <code>true</code> if the {@link #dist()} was updated by the +// * last APPLY. +// */ +// public boolean isChanged() { +// synchronized (this) { +// return changed; +// } +// } /** * The current estimate of the minimum distance from the starting vertex @@ -144,7 +136,9 @@ @Override public String toString() { - return "{dist=" + dist() + ", changed=" + isChanged() + "}"; + return "{dist=" + dist() + ", predecessor=" + predecessor.get() +// + ", changed=" + isChanged() + + "}"; } @@ -155,47 +149,48 @@ // Set distance to zero for starting vertex. dist = 0; + this.predecessor.set(null); - // Must be true to trigger scatter in the 1st round! - changed = true; +// // Must be true to trigger scatter in the 1st round! +// changed = true; } - /** - * Update the vertex state to the minimum of the combined sum and its - * current state. - * - * @param u - * The vertex that is the owner of this {@link VS vertex - * state} (used only for debug info). - * @param sum - * The combined sum from the gather phase. - * - * @return <code>this</code> iff the vertex state was modified. - * - * FIXME PREDECESSOR: We can not track the predecessor because - * the SSSP algorithm currently uses a GATHER phase and a - * SCATTER phase rather than doing all the work in a push-style - * SCATTER phase. - */ - synchronized private VS apply(final Value u, final Integer sum) { +// /** +// * Update the vertex state to the minimum of the combined sum and its +// * current state. +// * +// * @param u +// * The vertex that is the owner of this {@link VS vertex +// * state} (used only for debug info). +// * @param sum +// * The combined sum from the gather phase. +// * +// * @return <code>this</code> iff the vertex state was modified. +// * +// * FIXME PREDECESSOR: We can not track the predecessor because +// * the SSSP algorithm currently uses a GATHER phase and a +// * SCATTER phase rather than doing all the work in a push-style +// * SCATTER phase. +// */ +// synchronized private VS apply(final Value u, final Integer sum) { +// +// final int minDist = sum; +// +// changed = false; +// if (dist > minDist) { +// dist = minDist; +// changed = true; +// if (log.isDebugEnabled()) +// log.debug("u=" + u + ", us=" + this + ", minDist=" +// + minDist); +// return this; +// } +// +// return null; +// +// } - final int minDist = sum; - - changed = false; - if (dist > minDist) { - dist = minDist; - changed = true; - if (log.isDebugEnabled()) - log.debug("u=" + u + ", us=" + this + ", minDist=" - + minDist); - return this; - } - - return null; - - } - /** * Update the vertex state to the new (reduced) distance. * @@ -213,7 +208,8 @@ */ if (newDist < dist) { dist = newDist; - changed = true; + this.predecessor.set(predecessor); +// changed = true; return true; } return false; @@ -263,7 +259,8 @@ @Override public EdgesEnum getGatherEdges() { - return EdgesEnum.InEdges; +// return EdgesEnum.InEdges; + return EdgesEnum.NoEdges; } @@ -297,57 +294,57 @@ @Override public Integer gather(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u, final Statement e) { + throw new UnsupportedOperationException(); -// assert e.getObject().equals(u); +//// assert e.getObject().equals(u); +// +//// final VS src = state.getState(e.getSubject()); +// final VS src = state.getState(u); +// +// final int d = src.dist(); +// +// if (d == Integer.MAX_VALUE) { +// +// // Note: Avoids overflow (wrapping around to a negative value). +// return d; +// +// } +// +// return d + EDGE_LENGTH; -// final VS src = state.getState(e.getSubject()); - final VS src = state.getState(u); - - final int d = src.dist(); - - if (d == Integer.MAX_VALUE) { - - // Note: Avoids overflow (wrapping around to a negative value). - return d; - - } - - return d + EDGE_LENGTH; - } /** - * MIN + * UNUSED. */ @Override public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Integer left, final Integer right) { + throw new UnsupportedOperationException(); +// return Math.min(left, right); - return Math.min(left, right); - } - /** - * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new - * <i>sum</i>. - * <p> - * {@inheritDoc} - */ + /** NOP. */ +// * Update the {@link VS#dist()} and {@link VS#isChanged()} based on the new +// * <i>sum</i>. +// * <p> +// * {@inheritDoc} @Override public SSSP.VS apply(final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u, final Integer sum) { - if (sum != null) { +// if (sum != null) { +// +//// log.error("u=" + u + ", us=" + us + ", sum=" + sum); +// +// // Get the state for that vertex. +// final SSSP.VS us = state.getState(u); +// +// return us.apply(u, sum); +// +// } -// log.error("u=" + u + ", us=" + us + ", sum=" + sum); - - // Get the state for that vertex. - final SSSP.VS us = state.getState(u); - - return us.apply(u, sum); - - } - // No change. return null; @@ -370,32 +367,9 @@ // } /** - * The remote vertex is scheduled if this vertex is changed. - * <p> - * Note: We are scattering to out-edges. Therefore, this vertex is - * {@link Statement#getSubect()}. The remote vertex is - * {@link Statement#getObject()}. - * <p> - * {@inheritDoc} - * - * FIXME OPTIMIZE: Test both variations on a variety of data sets and see - * which is better (actually, just replace with a push style Scatter of the - * updates): - * - * <p> - * Zhisong wrote: In the original GASengine, the scatter operator only need - * to access the status of the source: src.changes. - * - * To check the status of destination, it needs to load destination data: - * dst.dist and edge data: e. And then check if new dist is different from - * the old value. - * - * Bryan wrote: I will have to think about this more. It sounds like it - * depends on the fan-out of the scatter at time t versus the fan-in of the - * gather at time t+1. The optimization might only benefit if a reasonable - * fraction of the destination vertices wind up NOT being retriggered. I - * will try on these variations in the Java code as well. - * </p> + * The remote vertex is scheduled the weighted edge from this vertex to the + * remote vertex plus the weight on this vertex is less than the weight on + * the remote vertex. */ @Override public void scatter(final IGASState<SSSP.VS, SSSP.ES, Integer> state, @@ -413,8 +387,10 @@ // last observed distance for the remote vertex. final int otherDist = otherState.dist(); + // Note: test first without lock. if (newDist < otherDist) { + // Tested again inside VS while holding lock. if (otherState.scatter(u/* predecessor */, newDist)) { if (log.isDebugEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-04-02 16:13:06
|
Revision: 8028 http://sourceforge.net/p/bigdata/code/8028 Author: mrpersonick Date: 2014-04-02 16:13:03 +0000 (Wed, 02 Apr 2014) Log Message: ----------- fixing ticket 872 - added a magic predicate to full text search for range count Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/search/FullTextIndex.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/ASTSearchOptimizer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SearchServiceFactory.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BDS.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/search/FullTextIndex.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/search/FullTextIndex.java 2014-04-02 13:14:09 UTC (rev 8027) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/search/FullTextIndex.java 2014-04-02 16:13:03 UTC (rev 8028) @@ -955,35 +955,137 @@ } + /** + * Perform a range count on a full text query. + */ public int count(final FullTextQuery query) { - final Hit[] a = _search(query); + if (cache.containsKey(query)) { + + if (log.isInfoEnabled()) + log.info("found hits in cache"); + + return cache.get(query).length; + + } else { + + if (log.isInfoEnabled()) + log.info("did not find hits in cache"); + + } + + // tokenize the query. + final TermFrequencyData<V> qdata = tokenize(query); + + // No terms after stopword extraction + if (qdata == null) { + + cache.put(query, new Hit[] {}); + + return 0; + + } + + /* + * We can run an optimized version of this (just a quick range count) + * but only if the caller does not care about exact match and has + * not specified a regex. + */ + if (qdata.distinctTermCount() == 1 && + !query.isMatchExact() && query.getMatchRegex() == null) { + + final boolean prefixMatch = query.isPrefixMatch(); + + final Map.Entry<String, ITermMetadata> e = qdata.getSingletonEntry(); + + final String termText = e.getKey(); + + final ITermMetadata md = e.getValue(); + + final CountIndexTask<V> task1 = new CountIndexTask<V>(termText, 0, 1, + prefixMatch, md.getLocalTermWeight(), this); + + return (int) task1.getRangeCount(); + + } else { + + final Hit<V>[] a = _search(query); + + return a.length; + + } - return a.length; - } - public Hit<V>[] _search(final FullTextQuery q) { + protected TermFrequencyData<V> tokenize(final FullTextQuery query) { - final String query = q.getQuery(); - final String languageCode = q.getLanguageCode(); - final boolean prefixMatch = q.isPrefixMatch(); - final double minCosine = q.getMinCosine(); - final double maxCosine = q.getMaxCosine(); - final int minRank = q.getMinRank(); - final int maxRank = q.getMaxRank(); - final boolean matchAllTerms = q.isMatchAllTerms(); - final boolean matchExact = q.isMatchExact(); - final String regex = q.getMatchRegex(); - long timeout = q.getTimeout(); - final TimeUnit unit = q.getTimeUnit(); + final String q = query.getQuery(); + final String languageCode = query.getLanguageCode(); + final boolean prefixMatch = query.isPrefixMatch(); + // tokenize the query. + final TermFrequencyData<V> qdata; + { + + final TokenBuffer<V> buffer = new TokenBuffer<V>(1, this); + + /* + * If we are using prefix match ('*' operator) then we don't want to + * filter stopwords from the search query. + */ + final boolean filterStopwords = !prefixMatch; + + index(buffer, // + null, // docId // was Long.MIN_VALUE + Integer.MIN_VALUE, // fieldId + languageCode,// + new StringReader(q), // + filterStopwords// + ); + + if (buffer.size() == 0) { + + /* + * There were no terms after stopword extration. + */ + + log.warn("No terms after stopword extraction: query=" + query); + + return null; + + } + + qdata = buffer.get(0); + + qdata.normalize(); + + } + + return qdata; + + } + + public Hit<V>[] _search(final FullTextQuery query) { + + final String queryStr = query.getQuery(); + final String languageCode = query.getLanguageCode(); + final boolean prefixMatch = query.isPrefixMatch(); + final double minCosine = query.getMinCosine(); + final double maxCosine = query.getMaxCosine(); + final int minRank = query.getMinRank(); + final int maxRank = query.getMaxRank(); + final boolean matchAllTerms = query.isMatchAllTerms(); + final boolean matchExact = query.isMatchExact(); + final String regex = query.getMatchRegex(); + long timeout = query.getTimeout(); + final TimeUnit unit = query.getTimeUnit(); + final long begin = System.currentTimeMillis(); // if (languageCode == null) // throw new IllegalArgumentException(); - if (query == null) + if (queryStr == null) throw new IllegalArgumentException(); if (minCosine < 0d || minCosine > 1d) @@ -1002,7 +1104,7 @@ throw new IllegalArgumentException(); if (log.isInfoEnabled()) - log.info("languageCode=[" + languageCode + "], text=[" + query + log.info("languageCode=[" + languageCode + "], text=[" + queryStr + "], minCosine=" + minCosine + ", maxCosine=" + maxCosine + ", minRank=" + minRank @@ -1018,7 +1120,7 @@ } - final FullTextQuery cacheKey = q; + final FullTextQuery cacheKey = query; Hit<V>[] a; @@ -1034,145 +1136,24 @@ if (log.isInfoEnabled()) log.info("did not find hits in cache"); - // tokenize the query. - final TermFrequencyData<V> qdata; - { - - final TokenBuffer<V> buffer = new TokenBuffer<V>(1, this); - - /* - * If we are using prefix match ('*' operator) then we don't want to - * filter stopwords from the search query. - */ - final boolean filterStopwords = !prefixMatch; - - index(buffer, // - null, // docId // was Long.MIN_VALUE - Integer.MIN_VALUE, // fieldId - languageCode,// - new StringReader(query), // - filterStopwords// - ); - - if (buffer.size() == 0) { - - /* - * There were no terms after stopword extration. - */ - - log.warn("No terms after stopword extraction: query=" + query); - - a = new Hit[] {}; - - cache.put(cacheKey, a); - - return a; - - } - - qdata = buffer.get(0); - - qdata.normalize(); - - } - - final IHitCollector<V> hits; - - if (qdata.distinctTermCount() == 1) { - - final Map.Entry<String, ITermMetadata> e = qdata.getSingletonEntry(); - - final String termText = e.getKey(); + // tokenize the query. + final TermFrequencyData<V> qdata = tokenize(query); + + // No terms after stopword extraction + if (qdata == null) { - final ITermMetadata md = e.getValue(); - - final CountIndexTask<V> task1 = new CountIndexTask<V>(termText, 0, 1, prefixMatch, md - .getLocalTermWeight(), this); - - hits = new SingleTokenHitCollector<V>(task1); - - } else { - - final List<CountIndexTask<V>> tasks = new ArrayList<CountIndexTask<V>>( - qdata.distinctTermCount()); - - int i = 0; - for (Map.Entry<String, ITermMetadata> e : qdata.terms.entrySet()) { - - final String termText = e.getKey(); - - final ITermMetadata md = e.getValue(); - - tasks.add(new CountIndexTask<V>(termText, i++, qdata.terms.size(), prefixMatch, md - .getLocalTermWeight(), this)); - - } - - hits = new MultiTokenHitCollector<V>(tasks); - - } - - // run the queries. - { - - final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>( - qdata.distinctTermCount()); - - int i = 0; - for (Map.Entry<String, ITermMetadata> e : qdata.terms.entrySet()) { - - final String termText = e.getKey(); - - final ITermMetadata md = e.getValue(); - - tasks.add(new ReadIndexTask<V>(termText, i++, qdata.terms.size(), - prefixMatch, md.getLocalTermWeight(), this, hits)); - - } - - final ExecutionHelper<Object> executionHelper = new ExecutionHelper<Object>( - getExecutorService(), timeout, unit); - - try { - - final long start = System.currentTimeMillis(); - - executionHelper.submitTasks(tasks); - - if (log.isInfoEnabled()) { - final long readTime = System.currentTimeMillis() - start; - log.info("read time: " + readTime); - } - - } catch (InterruptedException ex) { - - if (log.isInfoEnabled()) { - // TODO Should we wrap and toss this interrupt instead? - log.info("Interrupted - only partial results will be returned."); - } - - /* - * Yes, let's toss it. We were getting into a situation - * where the ExecutionHelper above received an interrupt - * but we still went through the heavy-weight filtering - * operations below (matchExact or matchRegex). - */ - throw new RuntimeException(ex); - - } catch (ExecutionException ex) { - - throw new RuntimeException(ex); - - } - - } - - a = hits.getHits(); - + cache.put(cacheKey, a = new Hit[] {}); + + return a; + + } + + a = executeQuery(qdata, prefixMatch, timeout, unit); + if (a.length == 0) { log.info("No hits: languageCode=[" + languageCode + "], query=[" - + query + "]"); + + queryStr + "]"); cache.put(cacheKey, a); @@ -1223,14 +1204,14 @@ */ if (matchExact) { - a = matchExact(a, query); + a = matchExact(a, queryStr); } if (a.length == 0) { log.warn("No hits after matchAllTerms pruning: languageCode=[" + languageCode + "], query=[" - + query + "]"); + + queryStr + "]"); cache.put(cacheKey, a); @@ -1260,7 +1241,7 @@ if (a.length == 0) { log.warn("No hits after regex pruning: languageCode=[" + languageCode + "], query=[" - + query + "], regex=[" + regex + "]"); + + queryStr + "], regex=[" + regex + "]"); cache.put(cacheKey, a); @@ -1299,6 +1280,27 @@ } + /* + * Take a slice of the hits based on min/max cosine and min/max rank. + */ + a = slice(query, a); + + final long elapsed = System.currentTimeMillis() - begin; + + if (log.isInfoEnabled()) + log.info("Done: " + a.length + " hits in " + elapsed + "ms"); + + return a; + + } + + protected Hit<V>[] slice(final FullTextQuery query, Hit<V>[] a) { + + final double minCosine = query.getMinCosine(); + final double maxCosine = query.getMaxCosine(); + final int minRank = query.getMinRank(); + final int maxRank = query.getMaxRank(); + // if (log.isDebugEnabled()) { // log.debug("before min/max cosine/rank pruning:"); // for (Hit<V> h : a) @@ -1422,13 +1424,106 @@ } - final long elapsed = System.currentTimeMillis() - begin; + return a; - if (log.isInfoEnabled()) - log.info("Done: " + a.length + " hits in " + elapsed + "ms"); + } + + protected Hit<V>[] executeQuery(final TermFrequencyData<V> qdata, + final boolean prefixMatch, final long timeout, final TimeUnit unit) { + + final IHitCollector<V> hits; + + if (qdata.distinctTermCount() == 1) { + + final Map.Entry<String, ITermMetadata> e = qdata.getSingletonEntry(); + + final String termText = e.getKey(); + + final ITermMetadata md = e.getValue(); - return a; + final CountIndexTask<V> task1 = new CountIndexTask<V>(termText, 0, 1, + prefixMatch, md.getLocalTermWeight(), this); + + hits = new SingleTokenHitCollector<V>(task1); + + } else { + + final List<CountIndexTask<V>> tasks = new ArrayList<CountIndexTask<V>>( + qdata.distinctTermCount()); + + int i = 0; + for (Map.Entry<String, ITermMetadata> e : qdata.terms.entrySet()) { + + final String termText = e.getKey(); + + final ITermMetadata md = e.getValue(); + + tasks.add(new CountIndexTask<V>(termText, i++, qdata.terms.size(), + prefixMatch, md.getLocalTermWeight(), this)); + + } + + hits = new MultiTokenHitCollector<V>(tasks); + + } + // run the queries. + { + + final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>( + qdata.distinctTermCount()); + + int i = 0; + for (Map.Entry<String, ITermMetadata> e : qdata.terms.entrySet()) { + + final String termText = e.getKey(); + + final ITermMetadata md = e.getValue(); + + tasks.add(new ReadIndexTask<V>(termText, i++, qdata.terms.size(), + prefixMatch, md.getLocalTermWeight(), this, hits)); + + } + + final ExecutionHelper<Object> executionHelper = new ExecutionHelper<Object>( + getExecutorService(), timeout, unit); + + try { + + final long start = System.currentTimeMillis(); + + executionHelper.submitTasks(tasks); + + if (log.isInfoEnabled()) { + final long readTime = System.currentTimeMillis() - start; + log.info("read time: " + readTime); + } + + } catch (InterruptedException ex) { + + if (log.isInfoEnabled()) { + // TODO Should we wrap and toss this interrupt instead? + log.info("Interrupted - only partial results will be returned."); + } + + /* + * Yes, let's toss it. We were getting into a situation + * where the ExecutionHelper above received an interrupt + * but we still went through the heavy-weight filtering + * operations below (matchExact or matchRegex). + */ + throw new RuntimeException(ex); + + } catch (ExecutionException ex) { + + throw new RuntimeException(ex); + + } + + } + + return hits.getHits(); + } /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/ASTSearchOptimizer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/ASTSearchOptimizer.java 2014-04-02 13:14:09 UTC (rev 8027) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/ASTSearchOptimizer.java 2014-04-02 16:13:03 UTC (rev 8028) @@ -108,6 +108,7 @@ set.add(BDS.SUBJECT_SEARCH); set.add(BDS.SEARCH_TIMEOUT); set.add(BDS.MATCH_REGEX); + set.add(BDS.RANGE_COUNT); searchUris = Collections.unmodifiableSet(set); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SearchServiceFactory.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SearchServiceFactory.java 2014-04-02 13:14:09 UTC (rev 8027) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SearchServiceFactory.java 2014-04-02 16:13:03 UTC (rev 8028) @@ -69,6 +69,7 @@ import com.bigdata.rdf.store.BDS; import com.bigdata.search.Hiterator; import com.bigdata.search.IHit; +import com.bigdata.striterator.ChunkedArrayIterator; import cutthecrap.utils.striterators.ICloseableIterator; @@ -300,6 +301,10 @@ assertObjectIsLiteral(sp); + } else if (uri.equals(BDS.RANGE_COUNT)) { + + assertObjectIsVariable(sp); + } else if(uri.equals(BDS.MATCH_REGEX)) { // a variable for the object is equivalent to regex = null @@ -367,6 +372,7 @@ private final boolean subjectSearch; private final Literal searchTimeout; private final Literal matchRegex; + private final IVariable<?> rangeCountVar; public SearchCall( final AbstractTripleStore store, @@ -415,6 +421,7 @@ IVariable<?> relVar = null; IVariable<?> rankVar = null; + IVariable<?> rangeCountVar = null; Literal minRank = null; Literal maxRank = null; Literal minRelevance = null; @@ -439,6 +446,8 @@ relVar = oVar; } else if (BDS.RANK.equals(p)) { rankVar = oVar; + } else if (BDS.RANGE_COUNT.equals(p)) { + rangeCountVar = oVar; } else if (BDS.MIN_RANK.equals(p)) { minRank = (Literal) oVal; } else if (BDS.MAX_RANK.equals(p)) { @@ -484,6 +493,7 @@ this.subjectSearch = subjectSearch; this.searchTimeout = searchTimeout; this.matchRegex = matchRegex; + this.rangeCountVar = rangeCountVar; } @@ -527,6 +537,46 @@ } + @SuppressWarnings({ "rawtypes", "unchecked" }) + private int getRangeCount() { + +// final IValueCentricTextIndexer<IHit> textIndex = (IValueCentricTextIndexer) store +// .getLexiconRelation().getSearchEngine(); + + final ITextIndexer<IHit> textIndex = (ITextIndexer) + (this.subjectSearch ? + store.getLexiconRelation().getSubjectCentricSearchEngine() : + store.getLexiconRelation().getSearchEngine()); + + if (textIndex == null) + throw new UnsupportedOperationException("No free text index?"); + + String s = query.getLabel(); + final boolean prefixMatch; + if (s.indexOf('*') >= 0) { + prefixMatch = true; + s = s.replaceAll("\\*", ""); + } else { + prefixMatch = false; + } + + return textIndex.count(new FullTextQuery( + s,// + query.getLanguage(),// + prefixMatch,// + matchRegex == null ? null : matchRegex.stringValue(), + matchAllTerms, + matchExact, + minRelevance == null ? BDS.DEFAULT_MIN_RELEVANCE : minRelevance.doubleValue()/* minCosine */, + maxRelevance == null ? BDS.DEFAULT_MAX_RELEVANCE : maxRelevance.doubleValue()/* maxCosine */, + minRank == null ? BDS.DEFAULT_MIN_RANK/*1*/ : minRank.intValue()/* minRank */, + maxRank == null ? BDS.DEFAULT_MAX_RANK/*Integer.MAX_VALUE*/ : maxRank.intValue()/* maxRank */, + searchTimeout == null ? BDS.DEFAULT_TIMEOUT/*0L*/ : searchTimeout.longValue()/* timeout */, + TimeUnit.MILLISECONDS + )); + + } + /** * {@inheritDoc} * @@ -561,7 +611,24 @@ } - return new HitConverter(getHiterator()); + if (rangeCountVar != null) { + + final int i = getRangeCount(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + final ListBindingSet bs = new ListBindingSet( + new IVariable[] { rangeCountVar }, + new IConstant[] { new Constant(new XSDNumericIV(i)) }); + + return new ChunkedArrayIterator<IBindingSet>(new IBindingSet[] { + bs + }); + + } else { + + return new HitConverter(getHiterator()); + + } } @@ -631,11 +698,11 @@ final ListBindingSet bs = new ListBindingSet(vars, vals); - if (log.isInfoEnabled()) { - log.info(bs); - log.info(query.getClass()); - log.info(((BigdataLiteral) query).getIV()); - log.info(((BigdataLiteral) query).getIV().getClass()); + if (log.isTraceEnabled()) { + log.trace(bs); + log.trace(query.getClass()); + log.trace(((BigdataLiteral) query).getIV()); + log.trace(((BigdataLiteral) query).getIV().getClass()); } return bs; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BDS.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BDS.java 2014-04-02 13:14:09 UTC (rev 8027) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/store/BDS.java 2014-04-02 16:13:03 UTC (rev 8028) @@ -420,5 +420,14 @@ * The default timeout for a free text search (milliseconds). */ final long DEFAULT_TIMEOUT = Long.MAX_VALUE; + + /** + * Magic predicate to specify that we want a range count done on the search. + * Bind the range count to the variable in the object position. Will + * attempt to do a fast range count on the index rather than materializing + * the hits into an array. This is only possible if matchExact == false + * and matchRegex == null. + */ + final URI RANGE_COUNT = new URIImpl(NAMESPACE + "rangeCount"); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2014-04-02 13:14:14
|
Revision: 8027 http://sourceforge.net/p/bigdata/code/8027 Author: martyncutcher Date: 2014-04-02 13:14:09 +0000 (Wed, 02 Apr 2014) Log Message: ----------- missed files from initial commit Added Paths: ----------- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties 2014-04-02 13:14:09 UTC (rev 8027) @@ -0,0 +1,25 @@ +log4j.rootCategory=WARN,haLog +log4j.logger.com.bigdata.ha=INFO +log4j.logger.com.bigdata.haLog=INFO + +#log4j.logger.com.bigdata.txLog=ALL +#log4j.logger.com.bigdata.rwstore=INFO +#log4j.logger.com.bigdata.journal=INFO +#log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL +log4j.logger.com.bigdata.journal.jini.ha=ALL +log4j.logger.com.bigdata.journal.jini.ha.HAJournalServer=ALL +#log4j.logger.com.bigdata.service.jini.lookup=ALL +#log4j.logger.com.bigdata.quorum=INFO +log4j.logger.com.bigdata.quorum.zk=INFO +log4j.logger.com.bigdata.io.writecache=INFO +#log4j.logger.com.bigdata.zookeeper=INFO +#log4j.logger.com.bigdata.zookeeper.ZooHelper=ALL +log4j.logger.com.bigdata.rdf.internal.LexiconConfiguration=FATAL + +log4j.appender.haLog=org.apache.log4j.FileAppender +log4j.appender.haLog.Threshold=ALL +# Note: path is relative to the directory in which the service starts. +log4j.appender.haLog.File=halog-A.txt +log4j.appender.haLog.Append=true +log4j.appender.haLog.layout=org.apache.log4j.PatternLayout +log4j.appender.haLog.layout.ConversionPattern=%-5p: %d{HH:mm:ss,SSS} %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n \ No newline at end of file Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties 2014-04-02 13:14:09 UTC (rev 8027) @@ -0,0 +1,25 @@ +log4j.rootCategory=WARN,haLog +log4j.logger.com.bigdata.ha=INFO +log4j.logger.com.bigdata.haLog=INFO + +#log4j.logger.com.bigdata.txLog=ALL +#log4j.logger.com.bigdata.rwstore=INFO +#log4j.logger.com.bigdata.journal=INFO +#log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL +log4j.logger.com.bigdata.journal.jini.ha=ALL +log4j.logger.com.bigdata.journal.jini.ha.HAJournalServer=ALL +#log4j.logger.com.bigdata.service.jini.lookup=ALL +#log4j.logger.com.bigdata.quorum=INFO +log4j.logger.com.bigdata.quorum.zk=INFO +#log4j.logger.com.bigdata.io.writecache=INFO +#log4j.logger.com.bigdata.zookeeper=INFO +#log4j.logger.com.bigdata.zookeeper.ZooHelper=ALL +log4j.logger.com.bigdata.rdf.internal.LexiconConfiguration=FATAL + +log4j.appender.haLog=org.apache.log4j.FileAppender +log4j.appender.haLog.Threshold=ALL +# Note: path is relative to the directory in which the service starts. +log4j.appender.haLog.File=halog-B.txt +log4j.appender.haLog.Append=true +log4j.appender.haLog.layout=org.apache.log4j.PatternLayout +log4j.appender.haLog.layout.ConversionPattern=%-5p: %d{HH:mm:ss,SSS} %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n \ No newline at end of file Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties 2014-04-02 13:14:09 UTC (rev 8027) @@ -0,0 +1,25 @@ +log4j.rootCategory=WARN,haLog +log4j.logger.com.bigdata.ha=INFO +log4j.logger.com.bigdata.haLog=INFO + +#log4j.logger.com.bigdata.txLog=ALL +#log4j.logger.com.bigdata.rwstore=INFO +#log4j.logger.com.bigdata.journal=INFO +#log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL +log4j.logger.com.bigdata.journal.jini.ha=ALL +log4j.logger.com.bigdata.journal.jini.ha.HAJournalServer=ALL +#log4j.logger.com.bigdata.service.jini.lookup=ALL +#log4j.logger.com.bigdata.quorum=INFO +log4j.logger.com.bigdata.quorum.zk=INFO +#log4j.logger.com.bigdata.io.writecache=INFO +#log4j.logger.com.bigdata.zookeeper=INFO +#log4j.logger.com.bigdata.zookeeper.ZooHelper=ALL +log4j.logger.com.bigdata.rdf.internal.LexiconConfiguration=FATAL + +log4j.appender.haLog=org.apache.log4j.FileAppender +log4j.appender.haLog.Threshold=ALL +# Note: path is relative to the directory in which the service starts. +log4j.appender.haLog.File=halog-C.txt +log4j.appender.haLog.Append=true +log4j.appender.haLog.layout=org.apache.log4j.PatternLayout +log4j.appender.haLog.layout.ConversionPattern=%-5p: %d{HH:mm:ss,SSS} %r %X{hostname} %X{serviceUUID} %X{taskname} %X{timestamp} %X{resources} %t %l: %m%n \ No newline at end of file This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mar...@us...> - 2014-04-02 13:13:00
|
Revision: 8026 http://sourceforge.net/p/bigdata/code/8026 Author: martyncutcher Date: 2014-04-02 13:12:56 +0000 (Wed, 02 Apr 2014) Log Message: ----------- Initial commit on creation of HA1_HA5 branch Modified Paths: -------------- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Added Paths: ----------- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy2.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA5JournalServer.java branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-D.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-E.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient1.config branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient5.config branches/BIGDATA_MGC_HA1_HA5/branch-notes.txt Removed Paths: ------------- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-A.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-B.properties branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/log4j-template-C.properties Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -3428,8 +3428,8 @@ if (quorum == null) return; - if (!quorum.isHighlyAvailable()) - return; +// if (!quorum.isHighlyAvailable()) +// return; /** * CRITICAL SECTION. We need obtain a distributed consensus for the Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -4574,7 +4574,7 @@ // } - log.warn("Starting NSS"); + log.warn("Starting NSS from " + jettyXml); // Start the server. jettyServer.start(); @@ -4658,8 +4658,9 @@ if (tmp == null) throw new IllegalStateException("Server is not running"); - return tmp.getConnectors()[0].getLocalPort(); - + final int port = tmp.getConnectors()[0].getLocalPort(); + haLog.warn("Returning NSSPort: " + port); + return port; } /** Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/java/com/bigdata/journal/jini/ha/SnapshotManager.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -1096,6 +1096,8 @@ + haLogBytesOnDisk// + ", journalSize=" + journalSize// + + ", thresholdPercentLogSize=" + + thresholdPercentLogSize// + ", percentLogSize=" + actualPercentLogSize// + "%, takeSnapshot=" + takeSnapshot // Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -133,7 +133,7 @@ * Implementation listens for the death of the child process and can be used * to decide when the child process is no longer executing. */ - private static class ServiceListener implements IServiceListener { + public static class ServiceListener implements IServiceListener { private volatile HAGlue haGlue; private volatile ProcessHelper processHelper; @@ -218,8 +218,12 @@ * The {@link Remote} interfaces for these services (if started and * successfully discovered). */ - private HAGlue serverA = null, serverB = null, serverC = null; + protected HAGlue serverA = null; + protected HAGlue serverB = null; + + protected HAGlue serverC = null; + /** * {@link UUID}s for the {@link HAJournalServer}s. */ @@ -232,17 +236,20 @@ * @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration * of embedded NSS jetty server using jetty-web.xml </a> */ - private final int A_JETTY_PORT = 8090, B_JETTY_PORT = A_JETTY_PORT + 1, - C_JETTY_PORT = B_JETTY_PORT + 1; + protected final int A_JETTY_PORT = 8090; + protected final int B_JETTY_PORT = A_JETTY_PORT + 1; + protected final int C_JETTY_PORT = B_JETTY_PORT + 1; /** * These {@link IServiceListener}s are used to reliably detect that the * corresponding process starts and (most importantly) that it is really * dies once it has been shutdown or destroyed. */ - private ServiceListener serviceListenerA = null, serviceListenerB = null; + protected ServiceListener serviceListenerA = null; - private ServiceListener serviceListenerC = null; + protected ServiceListener serviceListenerB = null; + + protected ServiceListener serviceListenerC = null; private LookupDiscoveryManager lookupDiscoveryManager = null; @@ -1143,14 +1150,14 @@ } - private void safeShutdown(final HAGlue haGlue, final File serviceDir, + void safeShutdown(final HAGlue haGlue, final File serviceDir, final ServiceListener serviceListener) { safeShutdown(haGlue, serviceDir, serviceListener, false/* now */); } - private void safeShutdown(final HAGlue haGlue, final File serviceDir, + protected void safeShutdown(final HAGlue haGlue, final File serviceDir, final ServiceListener serviceListener, final boolean now) { if (haGlue == null) @@ -1368,6 +1375,10 @@ } + protected String getZKConfigFile() { + return "zkClient.config"; + } + /** * Return Zookeeper quorum that can be used to reflect (or act on) the * distributed quorum state for the logical service. @@ -1382,7 +1393,7 @@ KeeperException, IOException { final Configuration config = ConfigurationProvider - .getInstance(new String[] { SRC_PATH + "zkClient.config" }); + .getInstance(new String[] { SRC_PATH + getZKConfigFile() }); zkClientConfig = new ZookeeperClientConfig(config); @@ -1551,7 +1562,7 @@ * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> */ - abstract private class StartServerTask implements Callable<HAGlue> { + public abstract class StartServerTask implements Callable<HAGlue> { private final String name; private final String configName; Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA5JournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,466 @@ +package com.bigdata.journal.jini.ha; + +import java.io.File; +import java.io.IOException; +import java.rmi.Remote; +import java.security.DigestException; +import java.security.NoSuchAlgorithmException; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.jini.start.IServiceListener; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownATask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownBTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownCTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.SafeShutdownTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.ServiceListener; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartATask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartBTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartCTask; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.StartServerTask; +import com.bigdata.quorum.AsynchronousQuorumCloseException; + +public class AbstractHA5JournalServerTestCase extends + AbstractHA3JournalServerTestCase { + + /** + * The {@link Remote} interfaces for these services (if started and + * successfully discovered). + */ + protected HAGlue serverD = null; + + protected HAGlue serverE = null; + + /** + * {@link UUID}s for the {@link HAJournalServer}s. + */ + private UUID serverDId = UUID.randomUUID(); + private UUID serverEId = UUID.randomUUID(); + + /** + * The HTTP ports at which the services will respond. + * + * @see <a href="http://trac.bigdata.com/ticket/730" > Allow configuration + * of embedded NSS jetty server using jetty-web.xml </a> + */ + protected final int D_JETTY_PORT = C_JETTY_PORT + 1; + protected final int E_JETTY_PORT = D_JETTY_PORT + 1; + + protected String getZKConfigFile() { + return "zkClient5.config"; // 5 stage pipeline + } + + /** + * These {@link IServiceListener}s are used to reliably detect that the + * corresponding process starts and (most importantly) that it is really + * dies once it has been shutdown or destroyed. + */ + protected ServiceListener serviceListenerD = null, serviceListenerE = null; + + protected File getServiceDirD() { + return new File(getTestDir(), "D"); + } + + protected File getServiceDirE() { + return new File(getTestDir(), "E"); + } + + protected File getHAJournalFileD() { + return new File(getServiceDirD(), "bigdata-ha.jnl"); + } + + protected File getHAJournalFileE() { + return new File(getServiceDirE(), "bigdata-ha.jnl"); + } + + protected File getHALogDirD() { + return new File(getServiceDirD(), "HALog"); + } + + protected File getHALogDirE() { + return new File(getServiceDirE(), "HALog"); + } + + /** + * Start A then B then C. As each service starts, this method waits for that + * service to appear in the pipeline in the proper position. + * + * @return The ordered array of services <code>[A, B, C]</code> + */ + protected HAGlue[] startSequenceABCDE() throws Exception { + + startA(); + awaitPipeline(new HAGlue[] { serverA }); + + startB(); + awaitPipeline(new HAGlue[] { serverA, serverB }); + + startC(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC }); + + startD(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD }); + + startE(); + awaitPipeline(new HAGlue[] { serverA, serverB, serverC, serverD, serverE }); + + return new HAGlue[] { serverA, serverB, serverC, serverD, serverE }; + + } + + /** + * Helper class for simultaneous/seqeunced start of 3 HA services. + */ + protected class ABCDE { + + /** + * The services. + */ + final HAGlue serverA, serverB, serverC, serverD, serverE; + + /** + * Start of 3 HA services (this happens in the ctor). + * + * @param sequential + * True if the startup should be sequential or false + * if services should start concurrently. + * @throws Exception + */ + public ABCDE(final boolean sequential) + throws Exception { + + this(true/* sequential */, true/* newServiceStarts */); + + } + + /** + * Start of 3 HA services (this happens in the ctor). + * + * @param sequential + * True if the startup should be sequential or false if + * services should start concurrently. + * @param newServiceStarts + * When <code>true</code> the services are new, the database + * should be at <code>commitCounter:=0</code> and the + * constructor will check for the implicit create of the + * default KB. + * @throws Exception + */ + public ABCDE(final boolean sequential, final boolean newServiceStarts) + throws Exception { + + if (sequential) { + + final HAGlue[] services = startSequenceABCDE(); + + serverA = services[0]; + + serverB = services[1]; + + serverC = services[2]; + + serverD = services[3]; + + serverE = services[4]; + + } else { + + final List<Callable<HAGlue>> tasks = new LinkedList<Callable<HAGlue>>(); + + tasks.add(new StartATask(false/* restart */)); + tasks.add(new StartBTask(false/* restart */)); + tasks.add(new StartCTask(false/* restart */)); + tasks.add(new StartDTask(false/* restart */)); + tasks.add(new StartETask(false/* restart */)); + + // Start all servers in parallel. Wait up to a timeout. + final List<Future<HAGlue>> futures = executorService.invokeAll( + tasks, 30/* timeout */, TimeUnit.SECONDS); + + serverA = futures.get(0).get(); + + serverB = futures.get(1).get(); + + serverC = futures.get(2).get(); + + serverD = futures.get(3).get(); + + serverE = futures.get(4).get(); + + } + + // wait for the quorum to fully meet. + awaitFullyMetQuorum(); + + if(newServiceStarts) { + // wait for the initial commit point (KB create). + awaitCommitCounter(1L, serverA, serverB, serverC, serverD, serverE); + } + + } + + public void shutdownAll() throws InterruptedException, + ExecutionException { + + shutdownAll(false/* now */); + + } + + public void shutdownAll(final boolean now) throws InterruptedException, + ExecutionException { + + final List<Callable<Void>> tasks = new LinkedList<Callable<Void>>(); + + tasks.add(new SafeShutdownATask()); + tasks.add(new SafeShutdownBTask()); + tasks.add(new SafeShutdownCTask()); + tasks.add(new SafeShutdownDTask()); + tasks.add(new SafeShutdownETask()); + + // Start all servers in parallel. Wait up to a timeout. + final List<Future<Void>> futures = executorService.invokeAll( + tasks, 30/* timeout */, TimeUnit.SECONDS); + + futures.get(0).get(); + futures.get(1).get(); + futures.get(2).get(); + futures.get(3).get(); + futures.get(4).get(); + + } + + public void assertDigestsEqual() throws NoSuchAlgorithmException, DigestException, IOException { + assertDigestsEquals(new HAGlue[] { serverA, serverB, serverC, serverD, serverE }); + } + + } + + protected HAGlue startD() throws Exception { + + return new StartDTask(false/* restart */).call(); + + } + + protected HAGlue startE() throws Exception { + + return new StartETask(false/* restart */).call(); + + } + + protected HAGlue restartD() throws Exception { + + return new StartDTask(true/* restart */).call(); + + } + + protected HAGlue restartE() throws Exception { + + return new StartETask(true/* restart */).call(); + + } + + protected void shutdownD() throws IOException { + safeShutdown(serverD, getServiceDirD(), serviceListenerD, true); + + serverD = null; + serviceListenerD = null; + } + + protected void shutdownE() throws IOException { + safeShutdown(serverE, getServiceDirE(), serviceListenerE, true); + + serverE = null; + serviceListenerE = null; + } + + protected class StartDTask extends StartServerTask { + + public StartDTask(final boolean restart) { + + super("D", "HAJournal-D.config", serverDId, D_JETTY_PORT, + serviceListenerD = new ServiceListener(), restart); + + } + + @Override + public HAGlue call() throws Exception { + + if (restart) { + + safeShutdown(serverD, getServiceDirD(), serviceListenerD); + + serverD = null; + + } + + return serverD = start(); + + } + + } + + protected class StartETask extends StartServerTask { + + public StartETask(final boolean restart) { + + super("E", "HAJournal-E.config", serverEId, E_JETTY_PORT, + serviceListenerE = new ServiceListener(), restart); + + } + + @Override + public HAGlue call() throws Exception { + + if (restart) { + + safeShutdown(serverE, getServiceDirE(), serviceListenerE); + + serverE = null; + + } + + return serverE = start(); + + } + + } + + protected class SafeShutdownDTask extends SafeShutdownTask { + + public SafeShutdownDTask() { + this(false/* now */); + } + + public SafeShutdownDTask(final boolean now) { + super(serverD, getServiceDirC(), serviceListenerD, now); + } + + } + + protected class SafeShutdownETask extends SafeShutdownTask { + + public SafeShutdownETask() { + this(false/* now */); + } + + public SafeShutdownETask(final boolean now) { + super(serverE, getServiceDirC(), serviceListenerE, now); + } + + } + public AbstractHA5JournalServerTestCase() { + } + + public AbstractHA5JournalServerTestCase(final String name) { + super(name); + } + + protected void destroyAll() throws AsynchronousQuorumCloseException, + InterruptedException, TimeoutException { + /** + * The most reliable tear down is in reverse pipeline order. + * + * This may not be necessary long term but for now we want to avoid + * destroying the leader first since it can lead to problems as + * followers attempt to reform + */ + final HAGlue leader; + final File leaderServiceDir; + final ServiceListener leaderListener; + if (quorum.isQuorumMet()) { + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + /* + * Note: It is possible to resolve a proxy for a service that has + * been recently shutdown or destroyed. This is effectively a data + * race. + */ + final HAGlue t = quorum.getClient().getLeader(token); + if (t.equals(serverA)) { + leader = t; + leaderServiceDir = getServiceDirA(); + leaderListener = serviceListenerA; + } else if (t.equals(serverB)) { + leader = t; + leaderServiceDir = getServiceDirB(); + leaderListener = serviceListenerB; + } else if (t.equals(serverC)) { + leader = t; + leaderServiceDir = getServiceDirC(); + leaderListener = serviceListenerC; + } else if (t.equals(serverD)) { + leader = t; + leaderServiceDir = getServiceDirD(); + leaderListener = serviceListenerD; + } else if (t.equals(serverE)) { + leader = t; + leaderServiceDir = getServiceDirE(); + leaderListener = serviceListenerE; + } else { + if (serverA == null && serverB == null && serverC == null && serverD == null && serverE == null) { + /* + * There are no services running and nothing to shutdown. We + * probably resolved a stale proxy to the leader above. + */ + return; + } + throw new IllegalStateException( + "Leader is none of A, B, or C: leader=" + t + ", A=" + + serverA + ", B=" + serverB + ", C=" + serverC); + } + } else { + leader = null; + leaderServiceDir = null; + leaderListener = null; + } + + if (leader == null || !leader.equals(serverA)) { + destroyA(); + } + + if (leader == null || !leader.equals(serverB)) { + destroyB(); + } + + if (leader == null || !leader.equals(serverC)) { + destroyC(); + } + + if (leader == null || !leader.equals(serverD)) { + destroyD(); + } + + if (leader == null || !leader.equals(serverE)) { + destroyE(); + } + + // Destroy leader last + if (leader != null) { + safeDestroy(leader, leaderServiceDir, leaderListener); + + serverA = serverB = serverC = serverD = serverE = null; + serviceListenerA = serviceListenerC = serviceListenerB = serviceListenerD = serviceListenerE = null; + } + + } + + protected void destroyD() { + safeDestroy(serverD, getServiceDirD(), serviceListenerD); + serverD = null; + serviceListenerD = null; + } + + protected void destroyE() { + safeDestroy(serverE, getServiceDirE(), serviceListenerE); + serverE = null; + serviceListenerE = null; + } + +} Modified: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:03:59 UTC (rev 8025) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -440,8 +440,17 @@ * Wait for the service to report that it is ready as a leader or * follower. */ - haGlue.awaitHAReady(awaitQuorumTimeout, TimeUnit.MILLISECONDS); + return awaitNSSAndHAReady(haGlue, awaitQuorumTimeout, TimeUnit.MILLISECONDS); + } + + protected HAStatusEnum awaitNSSAndHAReady(final HAGlue haGlue, long timout, TimeUnit unit) + throws Exception { /* + * Wait for the service to report that it is ready as a leader or + * follower. + */ + haGlue.awaitHAReady(timout, unit); + /* * Wait for the NSS to report the status of the service (this verifies * that the NSS interface is running). */ @@ -525,7 +534,9 @@ protected String getNanoSparqlServerURL(final HAGlue haGlue) throws IOException { - return "http://localhost:" + haGlue.getNSSPort(); + final int port = haGlue.getNSSPort(); + + return "http://localhost:" + port; } @@ -541,7 +552,7 @@ final String sparqlEndpointURL = getNanoSparqlServerURL(haGlue) + "/sparql"; - + // Client for talking to the NSS. final HttpClient httpClient = new DefaultHttpClient(ccm); @@ -1248,4 +1259,35 @@ } + /** + * The effective name for this test as used to name the directories in which + * we store things. + * + * TODO If there are method name collisions across the different test + * classes then the test suite name can be added to this. Also, if there are + * file naming problems, then this value can be munged before it is + * returned. + */ + private final String effectiveTestFileName = getClass().getSimpleName() + + "." + getName(); + + /** + * The directory that is the parent of each {@link HAJournalServer}'s + * individual service directory. + */ + protected File getTestDir() { + return new File(TGT_PATH, getEffectiveTestFileName()); + } + + /** + * The effective name for this test as used to name the directories in which + * we store things. + */ + protected String getEffectiveTestFileName() { + + return effectiveTestFileName; + + } + + } Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-D.config 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,288 @@ +import net.jini.jeri.BasicILFactory; +import net.jini.jeri.BasicJeriExporter; +import net.jini.jeri.tcp.TcpServerEndpoint; + +import net.jini.discovery.LookupDiscovery; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.lookup.entry.Name; +import net.jini.lookup.entry.Comment; +import net.jini.lookup.entry.Address; +import net.jini.lookup.entry.Location; +import net.jini.lookup.entry.ServiceInfo; +import net.jini.core.lookup.ServiceTemplate; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.UUID; + +import com.bigdata.util.NV; +import com.bigdata.util.config.NicUtil; +import com.bigdata.journal.Options; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; +import com.bigdata.jini.lookup.entry.*; +import com.bigdata.service.IBigdataClient; +import com.bigdata.service.AbstractTransactionService; +import com.bigdata.service.jini.*; +import com.bigdata.service.jini.lookup.DataServiceFilter; +import com.bigdata.service.jini.master.ServicesTemplate; +import com.bigdata.jini.start.config.*; +import com.bigdata.jini.util.ConfigMath; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + +// imports for various options. +import com.bigdata.btree.IndexMetadata; +import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPORelation; +import com.bigdata.rdf.spo.SPOKeyOrder; +import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.LexiconKeyOrder; +import com.bigdata.rawstore.Bytes; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit.*; + +/* + * This is a sample configuration file for a highly available Journal. A + * version of this file must be available to each HAJournalServer in the + * pipeline. + */ + +/* + * Globals. + */ +bigdata { + + private static fedname = "benchmark"; + + // NanoSparqlServer (http) port. + private static nssPort = ConfigMath.add(8090,3); + + // write replication pipeline port (listener). + private static haPort = ConfigMath.add(9090,3); + + // The #of services in the write pipeline. + private static replicationFactor = 5; + + // The logical service identifier shared by all members of the quorum. + private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1"); + + // The service directory. + // Note: Overridden by environment property when deployed. + private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"D")))); + //new File(new File(fedname,logicalServiceId),"D"); + + // journal data directory. + private static dataDir = serviceDir; + + // one federation, multicast discovery. + //static private groups = LookupDiscovery.ALL_GROUPS; + + // unicast discovery or multiple setups, MUST specify groups. + static private groups = new String[]{bigdata.fedname}; + + /** + * One or more unicast URIs of the form <code>jini://host/</code> + * or <code>jini://host:port/</code> (no default). + * + * This MAY be an empty array if you want to use multicast + * discovery <strong>and</strong> you have specified the groups as + * LookupDiscovery.ALL_GROUPS (a <code>null</code>). + */ + static private locators = new LookupLocator[] { + + // runs jini on the localhost using unicast locators. + new LookupLocator("jini://localhost/") + + }; + + /** + * A common point to set the Zookeeper client's requested + * sessionTimeout and the jini lease timeout. The default lease + * renewal period for jini is 5 minutes while for zookeeper it is + * more like 5 seconds. This puts the two systems onto a similar + * timeout period so that a disconnected client is more likely to + * be noticed in roughly the same period of time for either + * system. A value larger than the zookeeper default helps to + * prevent client disconnects under sustained heavy load. + * + * If you use a short lease timeout (LT 20s), then you need to override + * properties properties for the net.jini.lease.LeaseRenewalManager + * or it will run in a tight loop (it's default roundTripTime is 10s + * and it schedules lease renewals proactively.) + */ + + // jini + static private leaseTimeout = ConfigMath.s2ms(20); + + // zookeeper + static private sessionTimeout = (int)ConfigMath.s2ms(20); + + /* + * Configuration for default KB. + */ + + private static namespace = "kb"; + + private static kb = new NV[] { + + /* Setup for QUADS mode without the full text index. */ + + new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ), + new NV(BigdataSail.Options.QUADS, "true"), + new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"), + new NV(BigdataSail.Options.TEXT_INDEX, "false"), + new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"), + new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"), + + // Bump up the branching factor for the lexicon indices on the named kb. + // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "400"), + + // Bump up the branching factor for the statement indices on the named kb. + // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + SPORelation.NAME_SPO_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "1024"), + }; + +} + +/* + * Zookeeper client configuration. + */ +org.apache.zookeeper.ZooKeeper { + + /* Root znode for the federation instance. */ + zroot = "/" + bigdata.fedname; + + /* A comma separated list of host:port pairs, where the port is + * the CLIENT port for the zookeeper server instance. + */ + // standalone. + servers = "localhost:2081"; + + /* Session timeout (optional). */ + sessionTimeout = bigdata.sessionTimeout; + + /* + * ACL for the zookeeper nodes created by the bigdata federation. + * + * Note: zookeeper ACLs are not transmitted over secure channels + * and are placed into plain text Configuration files by the + * ServicesManagerServer. + */ + acl = new ACL[] { + + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + + }; +} + +/* + * You should not have to edit below this line. + */ + +/* + * Jini client configuration. + */ +com.bigdata.service.jini.JiniClient { + + groups = bigdata.groups; + + locators = bigdata.locators; + + entries = new Entry[] { + + // Optional metadata entries. + new Name("D"), + + // Note: Used to assign the ServiceID to the service. + new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId"))) + + }; + +} + +net.jini.lookup.JoinManager { + + maxLeaseDuration = bigdata.leaseTimeout; + +} + +/* + * Server configuration options. + */ +com.bigdata.journal.jini.ha.HAJournalServer { + + args = new String[] { + "-showversion", + "-Djava.security.policy=policy.all", + "-Dlog4j.configuration=file:log4j-D.properties", + "-Djava.util.logging.config.file=logging-D.properties", + "-server", + "-Xmx1G", + "-ea", + "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1053" + }; + + serviceDir = bigdata.serviceDir; + + // Default policy. + restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy(); + + // Suppress automatic snapshots. + snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy(); + + logicalServiceId = bigdata.logicalServiceId; + + writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort); + + /* + writePipelineAddr = new InetSocketAddress(// + InetAddress.getByName(// + NicUtil.getIpAddress("default.nic", "default", + false// loopbackOk + )), // + bigdata.haPort + ); + */ + + replicationFactor = bigdata.replicationFactor; + + // Use the overridden version of the HAJournal by default so we get the + // HAGlueTest API for every test. + HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest"; + +} + +/* + * Journal configuration. + */ +com.bigdata.journal.jini.ha.HAJournal { + + properties = (NV[]) ConfigMath.concat(new NV[] { + + new NV(Options.FILE, + ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))), + + new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW), + + new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"), + + new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"), + + new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + + }, bigdata.kb); + +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-E.config 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,288 @@ +import net.jini.jeri.BasicILFactory; +import net.jini.jeri.BasicJeriExporter; +import net.jini.jeri.tcp.TcpServerEndpoint; + +import net.jini.discovery.LookupDiscovery; +import net.jini.core.discovery.LookupLocator; +import net.jini.core.entry.Entry; +import net.jini.lookup.entry.Name; +import net.jini.lookup.entry.Comment; +import net.jini.lookup.entry.Address; +import net.jini.lookup.entry.Location; +import net.jini.lookup.entry.ServiceInfo; +import net.jini.core.lookup.ServiceTemplate; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.UUID; + +import com.bigdata.util.NV; +import com.bigdata.util.config.NicUtil; +import com.bigdata.journal.Options; +import com.bigdata.journal.BufferMode; +import com.bigdata.journal.jini.ha.HAJournal; +import com.bigdata.jini.lookup.entry.*; +import com.bigdata.service.IBigdataClient; +import com.bigdata.service.AbstractTransactionService; +import com.bigdata.service.jini.*; +import com.bigdata.service.jini.lookup.DataServiceFilter; +import com.bigdata.service.jini.master.ServicesTemplate; +import com.bigdata.jini.start.config.*; +import com.bigdata.jini.util.ConfigMath; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; + +// imports for various options. +import com.bigdata.btree.IndexMetadata; +import com.bigdata.btree.keys.KeyBuilder; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.spo.SPORelation; +import com.bigdata.rdf.spo.SPOKeyOrder; +import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.LexiconKeyOrder; +import com.bigdata.rawstore.Bytes; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit.*; + +/* + * This is a sample configuration file for a highly available Journal. A + * version of this file must be available to each HAJournalServer in the + * pipeline. + */ + +/* + * Globals. + */ +bigdata { + + private static fedname = "benchmark"; + + // NanoSparqlServer (http) port. + private static nssPort = ConfigMath.add(8090,4); + + // write replication pipeline port (listener). + private static haPort = ConfigMath.add(9090,4); + + // The #of services in the write pipeline. + private static replicationFactor = 5; + + // The logical service identifier shared by all members of the quorum. + private static logicalServiceId = System.getProperty("test.logicalServiceId","CI-HAJournal-1"); + + // The service directory. + // Note: Overridden by environment property when deployed. + private static serviceDir = new File(System.getProperty("test.serviceDir",ConfigMath.getAbsolutePath(new File(new File(fedname,logicalServiceId),"E")))); + //new File(new File(fedname,logicalServiceId),"E"); + + // journal data directory. + private static dataDir = serviceDir; + + // one federation, multicast discovery. + //static private groups = LookupDiscovery.ALL_GROUPS; + + // unicast discovery or multiple setups, MUST specify groups. + static private groups = new String[]{bigdata.fedname}; + + /** + * One or more unicast URIs of the form <code>jini://host/</code> + * or <code>jini://host:port/</code> (no default). + * + * This MAY be an empty array if you want to use multicast + * discovery <strong>and</strong> you have specified the groups as + * LookupDiscovery.ALL_GROUPS (a <code>null</code>). + */ + static private locators = new LookupLocator[] { + + // runs jini on the localhost using unicast locators. + new LookupLocator("jini://localhost/") + + }; + + /** + * A common point to set the Zookeeper client's requested + * sessionTimeout and the jini lease timeout. The default lease + * renewal period for jini is 5 minutes while for zookeeper it is + * more like 5 seconds. This puts the two systems onto a similar + * timeout period so that a disconnected client is more likely to + * be noticed in roughly the same period of time for either + * system. A value larger than the zookeeper default helps to + * prevent client disconnects under sustained heavy load. + * + * If you use a short lease timeout (LT 20s), then you need to override + * properties properties for the net.jini.lease.LeaseRenewalManager + * or it will run in a tight loop (it's default roundTripTime is 10s + * and it schedules lease renewals proactively.) + */ + + // jini + static private leaseTimeout = ConfigMath.s2ms(20); + + // zookeeper + static private sessionTimeout = (int)ConfigMath.s2ms(20); + + /* + * Configuration for default KB. + */ + + private static namespace = "kb"; + + private static kb = new NV[] { + + /* Setup for QUADS mode without the full text index. */ + + new NV(BigdataSail.Options.TRUTH_MAINTENANCE, "false" ), + new NV(BigdataSail.Options.QUADS, "true"), + new NV(BigdataSail.Options.STATEMENT_IDENTIFIERS, "false"), + new NV(BigdataSail.Options.TEXT_INDEX, "false"), + new NV(BigdataSail.Options.AXIOMS_CLASS,"com.bigdata.rdf.axioms.NoAxioms"), + new NV(BigdataSail.Options.QUERY_TIME_EXPANDER, "false"), + + // Bump up the branching factor for the lexicon indices on the named kb. + // com.bigdata.namespace.kb.lex.com.bigdata.btree.BTree.branchingFactor=400 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + LexiconRelation.NAME_LEXICON_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "400"), + + // Bump up the branching factor for the statement indices on the named kb. + // com.bigdata.namespace.kb.spo.com.bigdata.btree.BTree.branchingFactor=1024 + new NV(com.bigdata.config.Configuration.getOverrideProperty + ( namespace + "." + SPORelation.NAME_SPO_RELATION, + IndexMetadata.Options.BTREE_BRANCHING_FACTOR + ), "1024"), + }; + +} + +/* + * Zookeeper client configuration. + */ +org.apache.zookeeper.ZooKeeper { + + /* Root znode for the federation instance. */ + zroot = "/" + bigdata.fedname; + + /* A comma separated list of host:port pairs, where the port is + * the CLIENT port for the zookeeper server instance. + */ + // standalone. + servers = "localhost:2081"; + + /* Session timeout (optional). */ + sessionTimeout = bigdata.sessionTimeout; + + /* + * ACL for the zookeeper nodes created by the bigdata federation. + * + * Note: zookeeper ACLs are not transmitted over secure channels + * and are placed into plain text Configuration files by the + * ServicesManagerServer. + */ + acl = new ACL[] { + + new ACL(ZooDefs.Perms.ALL, new Id("world", "anyone")) + + }; +} + +/* + * You should not have to edit below this line. + */ + +/* + * Jini client configuration. + */ +com.bigdata.service.jini.JiniClient { + + groups = bigdata.groups; + + locators = bigdata.locators; + + entries = new Entry[] { + + // Optional metadata entries. + new Name("E"), + + // Note: Used to assign the ServiceID to the service. + new ServiceUUID(UUID.fromString(System.getProperty("test.serviceId"))) + + }; + +} + +net.jini.lookup.JoinManager { + + maxLeaseDuration = bigdata.leaseTimeout; + +} + +/* + * Server configuration options. + */ +com.bigdata.journal.jini.ha.HAJournalServer { + + args = new String[] { + "-showversion", + "-Djava.security.policy=policy.all", + "-Dlog4j.configuration=file:log4j-E.properties", + "-Djava.util.logging.config.file=logging-E.properties", + "-server", + "-Xmx1G", + "-ea", + "-Xdebug","-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1054" + }; + + serviceDir = bigdata.serviceDir; + + // Default policy. + restorePolicy = new com.bigdata.journal.jini.ha.DefaultRestorePolicy(); + + // Suppress automatic snapshots. + snapshotPolicy = new com.bigdata.journal.jini.ha.NoSnapshotPolicy(); + + logicalServiceId = bigdata.logicalServiceId; + + writePipelineAddr = new InetSocketAddress("localhost",bigdata.haPort); + + /* + writePipelineAddr = new InetSocketAddress(// + InetAddress.getByName(// + NicUtil.getIpAddress("default.nic", "default", + false// loopbackOk + )), // + bigdata.haPort + ); + */ + + replicationFactor = bigdata.replicationFactor; + + // Use the overridden version of the HAJournal by default so we get the + // HAGlueTest API for every test. + HAJournalClass = "com.bigdata.journal.jini.ha.HAJournalTest"; + +} + +/* + * Journal configuration. + */ +com.bigdata.journal.jini.ha.HAJournal { + + properties = (NV[]) ConfigMath.concat(new NV[] { + + new NV(Options.FILE, + ConfigMath.getAbsolutePath(new File(bigdata.dataDir,"bigdata-ha.jnl"))), + + new NV(Options.BUFFER_MODE,""+BufferMode.DiskRW), + + new NV(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY,"4000"), + + new NV(IndexMetadata.Options.BTREE_BRANCHING_FACTOR,"128"), + + new NV(AbstractTransactionService.Options.MIN_RELEASE_AGE,"1"), + + }, bigdata.kb); + +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1JournalServer.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,114 @@ +package com.bigdata.journal.jini.ha; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; + +import net.jini.config.Configuration; + + +public class TestHA1JournalServer extends AbstractHA3JournalServerTestCase { + + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + return new String[]{ +// "com.bigdata.journal.HAJournal.properties=" +TestHA3JournalServer.getTestHAJournalProperties(com.bigdata.journal.HAJournal.properties), + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", +// "com.bigdata.journal.jini.ha.HAJournalServer.HAJournalClass=\""+HAJournalTest.class.getName()+"\"", + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", + }; + + } + + protected String getZKConfigFile() { + return "zkClient1.config"; // 1 stage pipeline + } + + public TestHA1JournalServer() { + } + + public TestHA1JournalServer(String name) { + super(name); + } + + public void testStartA() throws Exception { + doStartA(); + } + + protected void doStartA() throws Exception { + + try { + quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + fail("HA1 requires quorum of 1!"); + } catch (TimeoutException te) { + // expected + } + + // Start 1 service. + final HAGlue serverA = startA(); + + // this should succeed + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + assertEquals(token, awaitFullyMetQuorum()); + + final HAGlue leader = quorum.getClient().getLeader(token); + + assertEquals(serverA, leader); + } + + public void testSimpleTransaction() throws Exception { + doStartA(); + + serverA.awaitHAReady(2, TimeUnit.SECONDS); + + /* + * Awaiting HAReady is not sufficient since the service may still + * writing the initial transaction. + * + * So it seems that the problem is not so much with HA1 as rather the + * status of a new journal being ready too soon to process an NSS + * request + */ + + awaitCommitCounter(1, new HAGlue[] { serverA}); + + // Thread.sleep(100); + + // serverA. + + log.warn("Calling SimpleTransaction"); + simpleTransaction(); + + awaitCommitCounter(2, new HAGlue[] { serverA}); + } + + public void testMultiTransaction() throws Exception { + doStartA(); + + awaitCommitCounter(1, new HAGlue[] { serverA}); + // Thread.sleep(1000); + + final int NTRANS = 10; + for (int t = 0; t < NTRANS; t++) { + simpleTransaction(); + } + + awaitCommitCounter(NTRANS+1, new HAGlue[] { serverA}); +} +} Added: branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java =================================================================== --- branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java (rev 0) +++ branches/BIGDATA_MGC_HA1_HA5/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA1SnapshotPolicy.java 2014-04-02 13:12:56 UTC (rev 8026) @@ -0,0 +1,530 @@ +package com.bigdata.journal.jini.ha; + +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; +import com.bigdata.ha.msg.HARootBlockRequest; +import com.bigdata.ha.msg.HASnapshotRequest; +import com.bigdata.ha.msg.IHASnapshotResponse; +import com.bigdata.journal.IRootBlockView; +import com.bigdata.journal.Journal; +import com.bigdata.journal.jini.ha.AbstractHA3JournalServerTestCase.LargeLoadTask; +import com.bigdata.rdf.sail.webapp.client.RemoteRepository; + +import net.jini.config.Configuration; + +public class TestHA1SnapshotPolicy extends AbstractHA3BackupTestCase { + + public TestHA1SnapshotPolicy() { + } + + public TestHA1SnapshotPolicy(String name) { + super(name); + } + + protected String getZKConfigFile() { + return "zkClient1.config"; // 1 stage pipeline + } + + /** + * {@inheritDoc} + * <p> + * Note: This overrides some {@link Configuration} values for the + * {@link HAJournalServer} in order to establish conditions suitable for + * testing the {@link ISnapshotPolicy} and {@link IRestorePolicy}. + */ + @Override + protected String[] getOverrides() { + + /* + * We need to set the time at which the DefaultSnapshotPolicy runs to + * some point in the Future in order to avoid test failures due to + * violated assumptions when the policy runs up self-triggering (based + * on the specified run time) during a CI run. + */ + final String neverRun = getNeverRunSnapshotTime(); + + /* + * For HA1, must have onlineDisasterRecovery to ensure logs are maintained + */ + return new String[]{ + "com.bigdata.journal.jini.ha.HAJournalServer.restorePolicy=new com.bigdata.journal.jini.ha.DefaultRestorePolicy(0L,1,0)", + "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.DefaultSnapshotPolicy("+neverRun+",0)", + // "com.bigdata.journal.jini.ha.HAJournalServer.snapshotPolicy=new com.bigdata.journal.jini.ha.NoSnapshotPolicy()", + "com.bigdata.journal.jini.ha.HAJournalServer.onlineDisasterRecovery=true", + "com.bigdata.journal.jini.ha.HAJournalServer.replicationFactor=1", + }; + + } + + /** + * Start a service. The quorum meets. Take a snapshot. Verify that the + * snapshot appears within a reasonable period of time and that it is for + * <code>commitCounter:=1</code> (just the KB create). Verify that the + * digest of the snapshot agrees with the digest of the journal. + */ + public void testA_snapshot() throws Exception { + + // Start 1 service. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + assertEquals(serverA, leader); // A is the leader. + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + // wait for the snapshot. + try { + ft.get(5, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + ft.cancel(true/* mayInterruptIfRunning */); + throw ex; + } + + final IRootBlockView snapshotRB = ft.get().getRootBlock(); + + final long commitCounter = 1L; + + // Verify snapshot is for the expected commit point. + assertEquals(commitCounter, snapshotRB.getCommitCounter()); + + // Snapshot directory contains the desired filename. + assertExpectedSnapshots(getSnapshotDirA(), + new long[] { commitCounter }); + + // Verify digest of snapshot agrees with digest of journal. + assertSnapshotDigestEquals(leader, commitCounter); + + } + + } + /** + * Start service. The quorum meets. Take a snapshot. Verify that the + * snapshot appears within a resonable period of time and that it is for + * <code>commitCounter:=1</code> (just the KB create). Request a second + * snapshot for the same commit point and verify that a <code>null</code> is + * returned since we already have a snapshot for that commit point. + */ + public void testA_snapshot_await_snapshot_null() throws Exception { + + // Start 2 services. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + // wait for the snapshot. + try { + ft.get(5, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + ft.cancel(true/* mayInterruptIfRunning */); + throw ex; + } + + final IRootBlockView snapshotRB = ft.get().getRootBlock(); + + final long commitCounter = 1L; + + // Verify snapshot is for the expected commit point. + assertEquals(commitCounter, snapshotRB.getCommitCounter()); + + // Snapshot directory contains the expected snapshot(s). + assertExpectedSnapshots(getSnapshotDirA(), + new long[] { commitCounter }); + + // Verify digest of snapshot agrees with digest of journal. + assertSnapshotDigestEquals(leader, commitCounter); + + } + + /* + * Verify 2nd request returns null since snapshot exists for that + * commit point. + */ + { + + // Verify quorum is still at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // request another snapshot. + final Future<IHASnapshotResponse> ft = leader + .takeSnapshot(new HASnapshotRequest(0/* percentLogSize */)); + + if (ft != null) { + + ft.cancel(true/* mayInteruptIfRunning */); + + fail("Expecting null since snapshot exists for current commit point."); + + } + + } + + } + + /** + * Test ability to request a snapshot using an HTTP GET + * <code>.../status?snapshot</code>. + * + * TODO Variant where the percentLogSize parameter is also expressed and + * verify that the semantics of that argument are obeyed. Use this to verify + * that the server will not take snapshot if size on disk of HALog files + * since the last snapshot is LT some percentage. + */ + public void testA_snapshot_HTTP_GET() throws Exception { + + // Start 2 services. + final HAGlue serverA = startA(); + + // Wait for a quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Await initial commit point (KB create). + awaitCommitCounter(1L, serverA); + + final HAGlue leader = quorum.getClient().getLeader(token); + + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + // Verify quorum is at the expected commit point. + assertEquals( + 1L, + leader.getRootBlock( + new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter()); + + // Snapshot directory is empty. + assertEquals(0, recursiveCount(getSnapshotDirA(),SnapshotManager.SNAPSHOT_FILTER)); + + doSnapshotRequest(leader); + + /* + * Get the Future. Should still be there, but if not then will be + * null (it which case the snapshot is already done). + */ + f... [truncated message content] |
From: <mar...@us...> - 2014-04-02 13:04:02
|
Revision: 8025 http://sourceforge.net/p/bigdata/code/8025 Author: martyncutcher Date: 2014-04-02 13:03:59 +0000 (Wed, 02 Apr 2014) Log Message: ----------- Initial branch creation for HA1 and HA5 Added Paths: ----------- branches/BIGDATA_MGC_HA1_HA5/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-01 20:16:47
|
Revision: 8024 http://sourceforge.net/p/bigdata/code/8024 Author: thompsonbry Date: 2014-04-01 20:16:43 +0000 (Tue, 01 Apr 2014) Log Message: ----------- Adding a stress test from a customer for #871. We have not been able to make this test fail yet. Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/test/com/bigdata/rdf/sail/StressTest_ClosedByInterrupt_RW.java 2014-04-01 20:16:43 UTC (rev 8024) @@ -0,0 +1,324 @@ +package com.bigdata.rdf.sail; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.RepositoryResult; + +public class StressTest_ClosedByInterrupt_RW extends TestCase { + + private static final Logger log = Logger + .getLogger(StressTest_ClosedByInterrupt_RW.class); + + public StressTest_ClosedByInterrupt_RW() { + super(); + } + + public StressTest_ClosedByInterrupt_RW(String name) { + super(name); + } + + private static final int NUM_INSERT_DELETE_LOOPS = 10; + private static final int NUM_INSERTS_PER_LOOP = 200000; + private static final int NUM_DELETES_PER_LOOP = 23000; + private static final long MILLIS_BETWEEN_INSERTS = -1; + private static final long MILLIS_BETWEEN_DELETES = -1; + private static final int NUM_STATEMENTS_PER_INSERT = 50; + + private static final int NUM_SELECTS = 5000; + private static final int NUM_STATEMENTS_PER_SELECT = 23000; + private static final long MILLIS_BETWEEN_QUERY_BURSTS = 1000; + + private static boolean HALT_ON_ERROR = true; + + private volatile boolean stopRequested = false; + + private void snooze(final long millis) throws InterruptedException { + if (millis > 0) { + Thread.sleep(millis); + } + } + +// @Test + public void test() throws RepositoryException, InterruptedException { + final File jnlFile = new File("interrupted.jnl"); + + if (jnlFile.exists()) { + jnlFile.delete(); + } + + final Properties props = new Properties(); + props.setProperty("com.bigdata.rdf.sail.namespace", "emc.srm.topology.kb"); + props.setProperty("com.bigdata.journal.AbstractJournal.bufferMode", "DiskRW"); + props.setProperty("com.bigdata.btree.writeRetentionQueue.capacity", "4000"); + props.setProperty("com.bigdata.btree.BTree.branchingFactor", "128"); + props.setProperty("com.bigdata.service.AbstractTransactionService.minReleaseAge", "1"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.textIndex", "false"); + props.setProperty( + "com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlTransitiveProperty", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlSameAsClosure", + "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlSameAsProperties", + "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlInverseOf", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlEquivalentClass", + "false"); + props.setProperty( + "com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlEquivalentProperty", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainOwlHasValue", "false"); + props.setProperty("com.bigdata.rdf.rules.InferenceEngine.forwardChainRdfTypeRdfsResource", + "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.axiomsClass", + "com.bigdata.rdf.axioms.NoAxioms"); + props.setProperty("com.bigdata.rdf.sail.truthMaintenance", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.justify", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.statementIdentifiers", "false"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.quadsMode", "true"); + props.setProperty("com.bigdata.journal.AbstractJournal.maximumExtent", "209715200"); + props.setProperty("com.bigdata.service.IBigdataClient.collectPlatformStatistics", "false"); + props.setProperty("com.bigdata.service.IBigdataClient.httpdPort", "-1"); + props.setProperty("com.bigdata.rdf.sail.bufferCapacity", "100000"); + props.setProperty("com.bigdata.rdf.store.AbstractTripleStore.bloomFilter", "false"); + + props.setProperty(BigdataSail.Options.CREATE_TEMP_FILE, Boolean.FALSE.toString()); + props.setProperty(BigdataSail.Options.FILE, jnlFile.toString()); + + final BigdataSail sail = new BigdataSail(props); + final BigdataSailRepository repo = new BigdataSailRepository(sail); + repo.initialize(); + + final InsertDeleteRunner mapper = new InsertDeleteRunner(repo); + final ReadOnlyRunner mdp = new ReadOnlyRunner(repo); + + final Thread mapperThread = new Thread(mapper); + final Thread mdpThread = new Thread(mdp); + + mapperThread.start(); + mdpThread.start(); + + mapperThread.join(); + System.out.println("Mapper is done"); + + stopRequested = true; + mdpThread.join(); + System.out.println("MDP is done"); + + repo.shutDown(); + System.out.println("Repository has shut down"); + + } + + private class InsertDeleteRunner implements Runnable { + + private final BigdataSailRepository repo; + + public InsertDeleteRunner(final BigdataSailRepository repo) { + this.repo = repo; + } + + @Override + public void run() { + + for (int loop = 0; loop < NUM_INSERT_DELETE_LOOPS; ++loop) { + + System.out.println("[Read/Write] enter loop " + loop); + RepositoryConnection conn = null; + + try { + System.out.println("[Read/Write] inserting ..."); + conn = repo.getConnection(); + conn.setAutoCommit(false); + + for (int index = 0; index < NUM_INSERTS_PER_LOOP; ++index) { + doInsert(conn, loop, index); + snooze(MILLIS_BETWEEN_INSERTS); + } + + conn.commit(); + conn.close(); + conn = null; + + } catch (Throwable t) { + printError("Read/Write threw on insert in loop " + loop, t); + } finally { + closeNoException(conn); + } + + try { + System.out.println("[Read/Write] deleting ..."); + conn = repo.getConnection(); + conn.setAutoCommit(false); + + for (int index = 0; index < NUM_DELETES_PER_LOOP; ++index) { + doDelete(conn, loop, index); + snooze(MILLIS_BETWEEN_DELETES); + } + + conn.commit(); + conn.close(); + conn = null; + + } catch (Throwable t) { + printError("Read/Write threw on delete in loop " + loop, t); + } finally { + closeNoException(conn); + } + System.out.println("[Read/Write] leave loop " + loop); + } + } + + private void doInsert(final RepositoryConnection conn, final int loop, final int index) + throws RepositoryException { + final ValueFactory vf = conn.getValueFactory(); + final URI c = vf.createURI("context:loop:" + loop + ":item:" + index); + final URI s = vf.createURI("subject:loop:" + loop + ":item:" + index); + for (int x = 0; x < NUM_STATEMENTS_PER_INSERT; ++x) { + final URI p = vf.createURI("predicate:" + x); + final Literal o = vf.createLiteral("SomeValue"); + conn.add(s, p, o, c); + } + } + + private void doDelete(final RepositoryConnection conn, final int loop, final int index) + throws RepositoryException { + final ValueFactory vf = conn.getValueFactory(); + final URI context = vf.createURI("context:loop:" + loop + ":item:" + index); + final Collection<Statement> statements = getStatementsForContext(conn, context); + for (Statement statement : statements) { + conn.remove(statement, context); + } + } + + private Collection<Statement> getStatementsForContext(final RepositoryConnection conn, + final URI context) throws RepositoryException { + RepositoryResult<Statement> res = null; + final Collection<Statement> statements = new ArrayList<Statement>(); + try { + res = conn.getStatements(null, null, null, false, context); + while (res.hasNext()) { + statements.add(res.next()); + } + } finally { + res.close(); + } + return statements; + } + } + + private class ReadOnlyRunner implements Runnable { + + private final BigdataSailRepository repo; + + public ReadOnlyRunner(final BigdataSailRepository repo) { + this.repo = repo; + } + + @Override + public void run() { + + RepositoryConnection conn = null; + TupleQueryResult result = null; + int loop = 0; + + while (stopRequested == false) { + + try { + + System.out.println("[Read ] snooze"); + snooze(MILLIS_BETWEEN_QUERY_BURSTS); + System.out.println("[Read ] enter loop " + loop); + + for (int invocation = 0; invocation < NUM_SELECTS; ++invocation) { + + conn = repo.getReadOnlyConnection(); + conn.setAutoCommit(false); + + final String sparql = "SELECT ?s WHERE { ?s ?p ?o } LIMIT " + + NUM_STATEMENTS_PER_SELECT; + final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); + result = query.evaluate(); + + final List<String> duds = new ArrayList<String>(); + + while (result.hasNext()) { + final BindingSet bindingSet = result.next(); + for (final Iterator<Binding> i = bindingSet.iterator(); i.hasNext();) { + final Binding b = i.next(); + if (b.getValue() != null) { + duds.add(b.getValue().stringValue()); + } + } + } + + result.close(); + result = null; + + conn.close(); + conn = null; + + } + + } catch (Throwable t) { + printError("Read Only threw in loop " + loop, t); + } finally { + closeNoException(result); + closeNoException(conn); + } + + System.out.println("[Read ] leave loop " + loop); + ++loop; + + } + } + + } + + private void closeNoException(RepositoryConnection conn) { + if (conn != null) { + try { + conn.close(); + } catch (RepositoryException e) { + log.error("closeNoException(conn)", e); + } + } + } + + private void closeNoException(TupleQueryResult result) { + if (result != null) { + try { + result.close(); + } catch (QueryEvaluationException e) { + log.error("closeNoException(result)", e); + } + } + } + + private void printError(final String message, final Throwable cause) { + log.error(message, cause); +// Exception e = new Exception(message, cause); +// e.printStackTrace(); + if (HALT_ON_ERROR) { + System.exit(123); + } + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-01 20:14:28
|
Revision: 8023 http://sourceforge.net/p/bigdata/code/8023 Author: thompsonbry Date: 2014-04-01 20:14:25 +0000 (Tue, 01 Apr 2014) Log Message: ----------- Partial fix for #871. This addresses the most likely code path in which we would fail to clear the interrupt status of a thread. However, the PipelineJoin runs on a worker thread in the Journal's executor service. That executor service should clear the interrupt status of the worker thread before assigning another task. Therefore, this does not provide an explanation for how an interrupt would escape scope of the invocation context in which the interrupt occurred. Thus, it does not provide an explanation for the behavior reported on that ticket. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2014-04-01 20:12:42 UTC (rev 8022) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/PipelineJoin.java 2014-04-01 20:14:25 UTC (rev 8023) @@ -1781,7 +1781,7 @@ if (bindex++ % 50 == 0) { // Periodically check for an interrupt. - if (Thread.currentThread().isInterrupted()) + if (Thread.interrupted()) throw new InterruptedException(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-01 20:12:45
|
Revision: 8022 http://sourceforge.net/p/bigdata/code/8022 Author: thompsonbry Date: 2014-04-01 20:12:42 +0000 (Tue, 01 Apr 2014) Log Message: ----------- javadoc fix related to #871. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-03-26 17:36:17 UTC (rev 8021) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/engine/QueryEngine.java 2014-04-01 20:12:42 UTC (rev 8022) @@ -610,7 +610,7 @@ * <p> * If the deadline has expired, {@link IRunningQuery#cancel(boolean)} will * be invoked. In order for a compute bound operator to terminate in a - * timely fashion, it MUST periodically test {@link Thread#isInterrupted()}. + * timely fashion, it MUST periodically test {@link Thread#interrupted()}. * <p> * Note: The deadline of a query may be set at most once. Thus, a query * which is entered into the {@link #deadlineQueue} may not have its This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 17:36:19
|
Revision: 8021 http://sourceforge.net/p/bigdata/code/8021 Author: thompsonbry Date: 2014-03-26 17:36:17 +0000 (Wed, 26 Mar 2014) Log Message: ----------- Removed the older servlet-api jar (v2.5). Removed Paths: ------------- branches/RDR/bigdata/lib/jetty/servlet-api-2.5.jar Deleted: branches/RDR/bigdata/lib/jetty/servlet-api-2.5.jar =================================================================== (Binary files differ) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 17:35:06
|
Revision: 8020 http://sourceforge.net/p/bigdata/code/8020 Author: thompsonbry Date: 2014-03-26 17:35:00 +0000 (Wed, 26 Mar 2014) Log Message: ----------- Added test suite for #868 (COUNT DISTINCT does not return any solutions if there are no solutions in the data that flow into that operator) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAggregationQuery.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.trig branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.trig Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAggregationQuery.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAggregationQuery.java 2014-03-26 17:26:14 UTC (rev 8019) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAggregationQuery.java 2014-03-26 17:35:00 UTC (rev 8020) @@ -138,5 +138,49 @@ ).runTest(); } + + /** + * Query correctly returns one row having a value of ZERO (0) for the count + * since there are no solutions in the data that match the query. + * + * <pre> + * select (count(?s) as ?count) { + * ?s ?p "abcedefg" . + * } + * </pre> + * + * @see <a href="http://trac.bigdata.com/ticket/868"> COUNT(DISTINCT) + * returns no rows rather than ZERO. </a> + */ + public void test_count_emptyResult() throws Exception { + + new TestHelper("count_emptyResult", // testURI, + "count_emptyResult.rq",// queryFileURL + "count_emptyResult.trig",// dataFileURL + "count_emptyResult.srx"// resultFileURL + ).runTest(); + } + /** + * Variation of the query above using COUNT(DISTINCT) should also return one + * solution having a binding of ZERO (0) for the count. + * + * <pre> + * select (count(distinct ?snippet) as ?count) { + * ?snippet ?p "abcedefg" . + * } + * </pre> + * + * @see <a href="http://trac.bigdata.com/ticket/868"> COUNT(DISTINCT) + * returns no rows rather than ZERO. </a> + */ + public void test_count_distinct_emptyResult()throws Exception { + + new TestHelper("count_distinct_emptyResult", // testURI, + "count_distinct_emptyResult.rq",// queryFileURL + "count_distinct_emptyResult.trig",// dataFileURL + "count_distinct_emptyResult.srx"// resultFileURL + ).runTest(); + } + } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.rq 2014-03-26 17:35:00 UTC (rev 8020) @@ -0,0 +1,3 @@ +select (count(distinct ?s) as ?count) { + ?s ?p "abcedefg" . +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.srx 2014-03-26 17:35:00 UTC (rev 8020) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="count"/> + </head> + <results> + <result> + <binding name="count"> + <literal datatype="http://www.w3.org/2001/XMLSchema#integer">0</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_distinct_emptyResult.trig =================================================================== Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.rq 2014-03-26 17:35:00 UTC (rev 8020) @@ -0,0 +1,3 @@ +select (count(?s) as ?count) { + ?s ?p "abcedefg" . +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.srx 2014-03-26 17:35:00 UTC (rev 8020) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="count"/> + </head> + <results> + <result> + <binding name="count"> + <literal datatype="http://www.w3.org/2001/XMLSchema#integer">0</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/count_emptyResult.trig =================================================================== This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 17:26:18
|
Revision: 8019 http://sourceforge.net/p/bigdata/code/8019 Author: thompsonbry Date: 2014-03-26 17:26:14 +0000 (Wed, 26 Mar 2014) Log Message: ----------- minor changes (javadoc edit, private method, final attribute). Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2014-03-26 16:13:38 UTC (rev 8018) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/MemoryGroupByOp.java 2014-03-26 17:26:14 UTC (rev 8019) @@ -515,7 +515,7 @@ * for the group was dropped (type error or violated HAVING * constraint). */ - public IBindingSet aggregate(final Iterable<IBindingSet> solutions) { + private IBindingSet aggregate(final Iterable<IBindingSet> solutions) { if (!solutions.iterator().hasNext()) { // Drop empty group. @@ -704,7 +704,7 @@ * will be bound. * @param selectDependency * When <code>true</code>, some aggregates bind variables which - * are relied on both other aggregates. In this case, this method + * are relied on by other aggregates. In this case, this method * must ensure that those bindings become visible. * @param aggregates * The binding set on which the results are being bound (by the @@ -782,7 +782,8 @@ expr.reset(); for (IBindingSet bset : solutions) { - Object constants[]=new Object[expr.arity()]; + + final Object constants[] = new Object[expr.arity()]; for (int i=0;i<expr.arity();i++){ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 16:13:41
|
Revision: 8018 http://sourceforge.net/p/bigdata/code/8018 Author: thompsonbry Date: 2014-03-26 16:13:38 +0000 (Wed, 26 Mar 2014) Log Message: ----------- Removed reference to the HALoadBalancerServlet that was breaking thr tomcat WAR deployment. Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-03-26 15:47:14 UTC (rev 8017) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-03-26 16:13:38 UTC (rev 8018) @@ -43,7 +43,6 @@ import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.IIndexManager; import com.bigdata.quorum.AbstractQuorum; -import com.bigdata.rdf.sail.webapp.HALoadBalancerServlet.InitParams; import com.bigdata.rdf.sail.webapp.client.IMimeTypes; /** @@ -76,9 +75,12 @@ /** * The {@link ServletContext} attribute whose value is the prefix for the * {@link HALoadBalancerServlet} iff it is running. + * <p> + * Note: Do NOT reference the <code>HALoadBalancerServlet</code> here. It + * will drag in the jetty dependencies and that breaks the tomcat WAR + * deployment. */ - static final String ATTRIBUTE_LBS_PREFIX = HALoadBalancerServlet.class - .getName() + "." + InitParams.PREFIX; + static final String ATTRIBUTE_LBS_PREFIX = "com.bigdata.rdf.sail.webapp.HALoadBalancerServlet.prefix"; // /** // * The {@link ServletContext} attribute whose value is the This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 15:47:17
|
Revision: 8017 http://sourceforge.net/p/bigdata/code/8017 Author: thompsonbry Date: 2014-03-26 15:47:14 +0000 (Wed, 26 Mar 2014) Log Message: ----------- Removed jetty import that was causing the war to fail. Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2014-03-26 01:30:59 UTC (rev 8016) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2014-03-26 15:47:14 UTC (rev 8017) @@ -44,7 +44,6 @@ import javax.servlet.ServletContextListener; import org.apache.log4j.Logger; -import org.eclipse.jetty.webapp.WebAppContext; import com.bigdata.Banner; import com.bigdata.bop.engine.QueryEngine; @@ -103,10 +102,10 @@ private boolean closeIndexManager; /** - * The name of the {@link WebAppContext} attribute under which we store any - * overrides for the init parameters of the {@link WebAppContext}. Note that - * it is NOT possible to actual modify the init parameters specified in the - * <code>web.xml</code> file. Therefore, we attach the overrides as an + * The name of the {@link ServletContext} attribute under which we store + * any overrides for the init parameters of the {@link ServletContext}. Note + * that it is NOT possible to actual modify the init parameters specified in + * the <code>web.xml</code> file. Therefore, we attach the overrides as an * attribute and then consult them from within * {@link BigdataRDFServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)} * . This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-26 01:31:04
|
Revision: 8016 http://sourceforge.net/p/bigdata/code/8016 Author: thompsonbry Date: 2014-03-26 01:30:59 +0000 (Wed, 26 Mar 2014) Log Message: ----------- Bug fix for the concurrent create/drop and list of namespaces. See #867 (NSS concurrency problem with list namespaces and create namespace) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHANamespace.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-03-24 15:41:50 UTC (rev 8015) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -82,6 +82,7 @@ import com.bigdata.rdf.sail.webapp.client.DefaultClientConnectionManagerFactory; import com.bigdata.rdf.sail.webapp.client.HttpException; import com.bigdata.rdf.sail.webapp.client.RemoteRepository; +import com.bigdata.rdf.sail.webapp.client.RemoteRepositoryManager; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; @@ -551,6 +552,21 @@ } + protected RemoteRepositoryManager getRemoteRepositoryManager(final HAGlue haGlue) + throws IOException { + + final String endpointURL = getNanoSparqlServerURL(haGlue); + + // Client for talking to the NSS. + final HttpClient httpClient = new DefaultHttpClient(ccm); + + final RemoteRepositoryManager repo = new RemoteRepositoryManager(endpointURL, + httpClient, executorService); + + return repo; + + } + /** * Counts the #of results in a SPARQL result set. * Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHANamespace.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHANamespace.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHANamespace.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -0,0 +1,211 @@ +/** + +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +package com.bigdata.journal.jini.ha; + +import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.openrdf.model.Statement; +import org.openrdf.query.GraphQueryResult; + +import com.bigdata.ha.HAGlue; +import com.bigdata.ha.HAStatusEnum; +import com.bigdata.rdf.sail.BigdataSail; +import com.bigdata.rdf.sail.webapp.client.RemoteRepositoryManager; + +/** + * Test case for concurrent list namespace and create namespace operations. + * <p> + * Note: The underlying issue is NOT HA specific. This test SHOULD be ported + * to the standard NSS test suite. + * + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS concurrency + * problem with list namespaces and create namespace </a> + */ +public class TestHANamespace extends AbstractHA3JournalServerTestCase { + + public TestHANamespace() { + } + + public TestHANamespace(String name) { + super(name); + } + + /** + * Test case for concurrent list namespace and create namespace operations. + * <p> + * Note: The underlying issue is NOT HA specific. This test SHOULD be ported + * to the standard NSS test suite. + * + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS concurrency + * problem with list namespaces and create namespace </a> + */ + public void test_ticket_867() throws Throwable { + + /* + * Controls the #of create/drop namespace operations. This many permits + * are obtained, and a permit is released each time we do a create + * namespace or drop namespace operation. + */ + final int NPERMITS = 50; + + /* + * Controls the #of queries that are executed in the main thread + * concurrent with those create/drop namespace operations. + */ + final int NQUERIES = 10; + + final String NAMESPACE_PREFIX = getName() + "-"; + + final ABC abc = new ABC(false/* simultaneous */); + + // Await quorum meet. + final long token = quorum.awaitQuorum(awaitQuorumTimeout, + TimeUnit.MILLISECONDS); + + // Figure out which service is the leader. + final HAGlue leader = quorum.getClient().getLeader(token); + + // Wait until up and running as the leader. + awaitHAStatus(leader, HAStatusEnum.Leader); + + final RemoteRepositoryManager repositoryManager = getRemoteRepositoryManager(leader); + + final Semaphore awaitDone = new Semaphore(0); + + final AtomicReference<Exception> failure = new AtomicReference<Exception>(null); + + try { + + final Thread getNamespacesThread = new Thread(new Runnable() { + + @Override + public void run() { + + try { + + /* + * Create-delete namespaces with incrementing number in + * name. + */ + int n = 0; + while (true) { + + final String namespace = NAMESPACE_PREFIX + n; + + final Properties props = new Properties(); + + props.put(BigdataSail.Options.NAMESPACE, + namespace); + + if (log.isInfoEnabled()) + log.info("Creating namespace " + namespace); + + repositoryManager + .createRepository(namespace, props); + + awaitDone.release(); // release a permit. + + if (n % 2 == 0) { + + if (log.isInfoEnabled()) + log.info("Removing namespace " + namespace); + + repositoryManager.deleteRepository(namespace); + + } + + n++; + + } + + } catch (Exception e) { + failure.set(e); + } finally { + // release all permits. + awaitDone.release(NPERMITS); + } + + } + + }); + + // Start running the create/drop namespace thread. + getNamespacesThread.start(); + + try { + /* + * Run list namespace requests concurrent with the create/drop + * namespace requests. + * + * FIXME Martyn: The list namespace requests should be running + * fully asynchronously with respect to the create/drop + * namespace requests, not getting a new set of permits and then + * just running the list namespace once for those NPERMITS + * create/drop requests. The way this is setup is missing too + * many opportunities for a concurrency issue with only one list + * namespace request per 50 create/drop requests. + */ + for (int n = 0; n < NQUERIES; n++) { + awaitDone.acquire(NPERMITS); + + if (failure.get() != null) + fail("Thread failure", failure.get()); + + if (log.isInfoEnabled()) + log.info("Get namespace list..."); + + try { + + final GraphQueryResult gqres = repositoryManager + .getRepositoryDescriptions(); + int count = 0; + while (gqres.hasNext()) { + final Statement st = gqres.next(); + if (log.isInfoEnabled()) + log.info("Statement: " + st); + count++; + } + log.warn("Processed " + count + " statements"); + assertTrue(count > 0); + } catch (Exception e) { + fail("Unable to retrieve namespaces", e); + + } + } + } finally { + getNamespacesThread.interrupt(); + } + + } finally { + + // repositoryManager.shutdown(); + + } + + } +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-03-24 15:41:50 UTC (rev 8015) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFContext.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -72,19 +72,14 @@ import org.openrdf.rio.RDFWriterRegistry; import org.openrdf.sail.SailException; -import com.bigdata.bop.BufferAnnotations; -import com.bigdata.bop.IPredicate; import com.bigdata.bop.engine.IRunningQuery; import com.bigdata.bop.engine.QueryEngine; -import com.bigdata.bop.join.PipelineJoin; -import com.bigdata.btree.IndexMetadata; import com.bigdata.counters.CAT; import com.bigdata.io.NullOutputStream; -import com.bigdata.journal.IBufferStrategy; import com.bigdata.journal.IIndexManager; +import com.bigdata.journal.ITransactionService; import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; -import com.bigdata.journal.RWStrategy; import com.bigdata.journal.TimestampUtility; import com.bigdata.rdf.changesets.IChangeLog; import com.bigdata.rdf.changesets.IChangeRecord; @@ -106,9 +101,7 @@ import com.bigdata.rdf.sparql.ast.QueryType; import com.bigdata.rdf.sparql.ast.Update; import com.bigdata.rdf.store.AbstractTripleStore; -import com.bigdata.relation.AbstractResource; import com.bigdata.relation.RelationSchema; -import com.bigdata.rwstore.RWStore; import com.bigdata.sparse.ITPS; import com.bigdata.sparse.SparseRowStore; import com.bigdata.util.concurrent.DaemonThreadFactory; @@ -2208,273 +2201,163 @@ } /** - * Return various interesting metadata about the KB state. + * Return a list of the namespaces for the {@link AbstractTripleStore}s + * registered against the bigdata instance. * - * @todo The range counts can take some time if the cluster is heavily - * loaded since they must query each shard for the primary statement - * index and the TERM2ID index. + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS concurrency + * problem with list namespaces and create namespace </a> */ - protected StringBuilder getKBInfo(final String namespace, - final long timestamp) { + /*package*/ List<String> getNamespaces(final long timestamp) { + + final long tx = newTx(timestamp); + + try { + + return getNamespaces(timestamp, tx); + + } finally { + + abortTx(tx); + + } - final StringBuilder sb = new StringBuilder(); + } - BigdataSailRepositoryConnection conn = null; + private List<String> getNamespaces(long timestamp, final long tx) { - try { + if (timestamp == ITx.READ_COMMITTED) { - conn = getQueryConnection(namespace, timestamp); - - final AbstractTripleStore tripleStore = conn.getTripleStore(); + // Use the last commit point. + timestamp = getIndexManager().getLastCommitTime(); - sb.append("class\t = " + tripleStore.getClass().getName() + "\n"); + } - sb - .append("indexManager\t = " - + tripleStore.getIndexManager().getClass() - .getName() + "\n"); + // the triple store namespaces. + final List<String> namespaces = new LinkedList<String>(); - sb.append("namespace\t = " + tripleStore.getNamespace() + "\n"); + if (log.isInfoEnabled()) + log.info("getNamespaces for " + timestamp); - sb.append("timestamp\t = " - + TimestampUtility.toString(tripleStore.getTimestamp()) - + "\n"); + final SparseRowStore grs = getIndexManager().getGlobalRowStore( + timestamp); - sb.append("statementCount\t = " + tripleStore.getStatementCount() - + "\n"); + if (grs == null) { - sb.append("termCount\t = " + tripleStore.getTermCount() + "\n"); + log.warn("No GRS @ timestamp=" + + TimestampUtility.toString(timestamp)); - sb.append("uriCount\t = " + tripleStore.getURICount() + "\n"); + // Empty. + return namespaces; - sb.append("literalCount\t = " + tripleStore.getLiteralCount() + "\n"); + } - /* - * Note: The blank node count is only available when using the told - * bnodes mode. - */ - sb - .append("bnodeCount\t = " - + (tripleStore.getLexiconRelation() - .isStoreBlankNodes() ? "" - + tripleStore.getBNodeCount() : "N/A") - + "\n"); + // scan the relation schema in the global row store. + @SuppressWarnings("unchecked") + final Iterator<ITPS> itr = (Iterator<ITPS>) grs + .rangeIterator(RelationSchema.INSTANCE); - sb.append(IndexMetadata.Options.BTREE_BRANCHING_FACTOR - + "=" - + tripleStore.getSPORelation().getPrimaryIndex() - .getIndexMetadata().getBranchingFactor() + "\n"); + while (itr.hasNext()) { - sb.append(IndexMetadata.Options.WRITE_RETENTION_QUEUE_CAPACITY - + "=" - + tripleStore.getSPORelation().getPrimaryIndex() - .getIndexMetadata() - .getWriteRetentionQueueCapacity() + "\n"); + // A timestamped property value set is a logical row with + // timestamped property values. + final ITPS tps = itr.next(); - sb.append("-- All properties.--\n"); - - // get the triple store's properties from the global row store. - final Map<String, Object> properties = getIndexManager() - .getGlobalRowStore(timestamp).read(RelationSchema.INSTANCE, - namespace); + // If you want to see what is in the TPS, uncomment this. + // System.err.println(tps.toString()); - // write them out, - for (String key : properties.keySet()) { - sb.append(key + "=" + properties.get(key)+"\n"); - } + // The namespace is the primary key of the logical row for the + // relation schema. + final String namespace = (String) tps.getPrimaryKey(); - /* - * And show some properties which can be inherited from - * AbstractResource. These have been mainly phased out in favor of - * BOP annotations, but there are a few places where they are still - * in use. - */ - - sb.append("-- Interesting AbstractResource effective properties --\n"); - - sb.append(AbstractResource.Options.CHUNK_CAPACITY + "=" - + tripleStore.getChunkCapacity() + "\n"); + // Get the name of the implementation class + // (AbstractTripleStore, SPORelation, LexiconRelation, etc.) + final String className = (String) tps.get(RelationSchema.CLASS) + .getValue(); - sb.append(AbstractResource.Options.CHUNK_OF_CHUNKS_CAPACITY + "=" - + tripleStore.getChunkOfChunksCapacity() + "\n"); + if (className == null) { + // Skip deleted triple store entry. + continue; + } - sb.append(AbstractResource.Options.CHUNK_TIMEOUT + "=" - + tripleStore.getChunkTimeout() + "\n"); + try { + final Class<?> cls = Class.forName(className); + if (AbstractTripleStore.class.isAssignableFrom(cls)) { + // this is a triple store (vs something else). + namespaces.add(namespace); + } + } catch (ClassNotFoundException e) { + log.error(e, e); + } - sb.append(AbstractResource.Options.FULLY_BUFFERED_READ_THRESHOLD + "=" - + tripleStore.getFullyBufferedReadThreshold() + "\n"); + } - sb.append(AbstractResource.Options.MAX_PARALLEL_SUBQUERIES + "=" - + tripleStore.getMaxParallelSubqueries() + "\n"); +// if (log.isInfoEnabled()) +// log.info("getNamespaces returning " + namespaces.size()); - /* - * And show some interesting effective properties for the KB, SPO - * relation, and lexicon relation. - */ - sb.append("-- Interesting KB effective properties --\n"); - - sb - .append(AbstractTripleStore.Options.TERM_CACHE_CAPACITY - + "=" - + tripleStore - .getLexiconRelation() - .getProperties() - .getProperty( - AbstractTripleStore.Options.TERM_CACHE_CAPACITY, - AbstractTripleStore.Options.DEFAULT_TERM_CACHE_CAPACITY) + "\n"); + return namespaces; - /* - * And show several interesting properties with their effective - * defaults. - */ + } + + /** + * Obtain a new transaction to protect operations against the specified view + * of the database. + * + * @param timestamp + * The timestamp for the desired view. + * + * @return The transaction identifier -or- <code>timestamp</code> if the + * {@link IIndexManager} is not a {@link Journal}. + * + * @see ITransactionService#newTx(long) + * + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS concurrency + * problem with list namespaces and create namespace </a> + */ + public long newTx(final long timestamp) { - sb.append("-- Interesting Effective BOP Annotations --\n"); + long tx = timestamp; // use dirty reads unless Journal. - sb.append(BufferAnnotations.CHUNK_CAPACITY - + "=" - + tripleStore.getProperties().getProperty( - BufferAnnotations.CHUNK_CAPACITY, - "" + BufferAnnotations.DEFAULT_CHUNK_CAPACITY) - + "\n"); + if (getIndexManager() instanceof Journal) { + final ITransactionService txs = ((Journal) getIndexManager()) + .getLocalTransactionManager().getTransactionService(); - sb - .append(BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY - + "=" - + tripleStore - .getProperties() - .getProperty( - BufferAnnotations.CHUNK_OF_CHUNKS_CAPACITY, - "" - + BufferAnnotations.DEFAULT_CHUNK_OF_CHUNKS_CAPACITY) - + "\n"); - - sb.append(BufferAnnotations.CHUNK_TIMEOUT - + "=" - + tripleStore.getProperties().getProperty( - BufferAnnotations.CHUNK_TIMEOUT, - "" + BufferAnnotations.DEFAULT_CHUNK_TIMEOUT) - + "\n"); - - sb.append(PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS - + "=" - + tripleStore.getProperties().getProperty( - PipelineJoin.Annotations.MAX_PARALLEL_CHUNKS, - "" + PipelineJoin.Annotations.DEFAULT_MAX_PARALLEL_CHUNKS) + "\n"); - - sb - .append(IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD - + "=" - + tripleStore - .getProperties() - .getProperty( - IPredicate.Annotations.FULLY_BUFFERED_READ_THRESHOLD, - "" - + IPredicate.Annotations.DEFAULT_FULLY_BUFFERED_READ_THRESHOLD) - + "\n"); - - // sb.append(tripleStore.predicateUsage()); - - if (tripleStore.getIndexManager() instanceof Journal) { - - final Journal journal = (Journal) tripleStore.getIndexManager(); - - final IBufferStrategy strategy = journal.getBufferStrategy(); - - if (strategy instanceof RWStrategy) { - - final RWStore store = ((RWStrategy) strategy).getStore(); - - store.showAllocators(sb); - - } - + try { + tx = txs.newTx(timestamp); + } catch (IOException e) { + // Note: Local operation. Will not throw IOException. + throw new RuntimeException(e); } - } catch (Throwable t) { - - log.warn(t.getMessage(), t); - - } finally { - - if(conn != null) { - try { - conn.close(); - } catch (RepositoryException e) { - log.error(e, e); - } - - } - } - return sb; - + return tx; } - /** - * Return a list of the namespaces for the {@link AbstractTripleStore}s - * registered against the bigdata instance. - */ - /*package*/ List<String> getNamespaces(final long timestamp) { - - // the triple store namespaces. - final List<String> namespaces = new LinkedList<String>(); + /** + * Abort a transaction obtained by {@link #newTx(long)}. + * + * @param tx + * The transaction identifier. + */ + public void abortTx(final long tx) { + if (getIndexManager() instanceof Journal) { +// if (!TimestampUtility.isReadWriteTx(tx)) { +// // Not a transaction. +// throw new IllegalStateException(); +// } - final SparseRowStore grs = getIndexManager().getGlobalRowStore( - timestamp); + final ITransactionService txs = ((Journal) getIndexManager()) + .getLocalTransactionManager().getTransactionService(); - if (grs == null) { + try { + txs.abort(tx); + } catch (IOException e) { + // Note: Local operation. Will not throw IOException. + throw new RuntimeException(e); + } - log.warn("No GRS @ timestamp=" - + TimestampUtility.toString(timestamp)); + } - // Empty. - return namespaces; - - } - - // scan the relation schema in the global row store. - @SuppressWarnings("unchecked") - final Iterator<ITPS> itr = (Iterator<ITPS>) grs - .rangeIterator(RelationSchema.INSTANCE); - - while (itr.hasNext()) { - - // A timestamped property value set is a logical row with - // timestamped property values. - final ITPS tps = itr.next(); - - // If you want to see what is in the TPS, uncomment this. -// System.err.println(tps.toString()); - - // The namespace is the primary key of the logical row for the - // relation schema. - final String namespace = (String) tps.getPrimaryKey(); - - // Get the name of the implementation class - // (AbstractTripleStore, SPORelation, LexiconRelation, etc.) - final String className = (String) tps.get(RelationSchema.CLASS) - .getValue(); - - if (className == null) { - // Skip deleted triple store entry. - continue; - } - - try { - final Class<?> cls = Class.forName(className); - if (AbstractTripleStore.class.isAssignableFrom(cls)) { - // this is a triple store (vs something else). - namespaces.add(namespace); - } - } catch (ClassNotFoundException e) { - log.error(e,e); - } - - } - - return namespaces; - - } - + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2014-03-24 15:41:50 UTC (rev 8015) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServletContextListener.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -343,17 +343,6 @@ // context.setAttribute(BigdataServlet.ATTRIBUTE_SPARQL_CACHE, // new SparqlCache(new MemoryManager(DirectBufferPool.INSTANCE))); - if (log.isInfoEnabled()) { - /* - * Log some information about the default kb (#of statements, etc). - */ - final long effectiveTimestamp = config.timestamp == ITx.READ_COMMITTED ? indexManager - .getLastCommitTime() : config.timestamp; - log.info("\n" - + rdfContext - .getKBInfo(config.namespace, effectiveTimestamp)); - } - { final boolean forceOverflow = Boolean.valueOf(context Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-03-24 15:41:50 UTC (rev 8015) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -459,30 +459,47 @@ * @throws IOException */ private void doShowProperties(final HttpServletRequest req, - final HttpServletResponse resp) throws IOException { + final HttpServletResponse resp) throws IOException { - final String namespace = getNamespace(req); + final String namespace = getNamespace(req); - final long timestamp = getTimestamp(req); + long timestamp = getTimestamp(req); - final AbstractTripleStore tripleStore = getBigdataRDFContext() - .getTripleStore(namespace, timestamp); + if (timestamp == ITx.READ_COMMITTED) { - if (tripleStore == null) { - /* - * There is no such triple/quad store instance. - */ - buildResponse(resp, HTTP_NOTFOUND, MIME_TEXT_PLAIN); - return; - } + // Use the last commit point. + timestamp = getIndexManager().getLastCommitTime(); - final Properties properties = PropertyUtil.flatCopy(tripleStore - .getProperties()); + } - sendProperties(req, resp, properties); - - } + final long tx = getBigdataRDFContext().newTx(timestamp); + + try { + + final AbstractTripleStore tripleStore = getBigdataRDFContext() + .getTripleStore(namespace, timestamp); + if (tripleStore == null) { + /* + * There is no such triple/quad store instance. + */ + buildResponse(resp, HTTP_NOTFOUND, MIME_TEXT_PLAIN); + return; + } + + final Properties properties = PropertyUtil.flatCopy(tripleStore + .getProperties()); + + sendProperties(req, resp, properties); + + } finally { + + getBigdataRDFContext().abortTx(tx); + + } + + } + /** * Generate a VoID Description for the known namespaces. */ @@ -498,51 +515,66 @@ } - /* - * The set of registered namespaces for KBs. + /** + * Protect the entire operation with a transaction, including the + * describe of each namespace that we discover. + * + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS concurrency + * problem with list namespaces and create namespace </a> */ - final List<String> namespaces = getBigdataRDFContext() - .getNamespaces(timestamp); + final long tx = getBigdataRDFContext().newTx(timestamp); + + try { + /* + * The set of registered namespaces for KBs. + */ + final List<String> namespaces = getBigdataRDFContext() + .getNamespaces(timestamp); - final Graph g = new GraphImpl(); + final Graph g = new GraphImpl(); - for(String namespace : namespaces) { - - // Get a view onto that KB instance for that timestamp. - final AbstractTripleStore tripleStore = getBigdataRDFContext() - .getTripleStore(namespace, timestamp); + for (String namespace : namespaces) { - if (tripleStore == null) { + // Get a view onto that KB instance for that timestamp. + final AbstractTripleStore tripleStore = getBigdataRDFContext() + .getTripleStore(namespace, timestamp); - /* - * There is no such triple/quad store instance (could be a - * concurrent delete of the namespace). - */ - - continue; - - } + if (tripleStore == null) { - final BNode aDataSet = g.getValueFactory().createBNode(); - - /* - * Figure out the service end point. - * - * Note: This is just the requestURL as reported. This makes is - * possible to support virtual hosting and similar http proxy - * patterns since the SPARQL end point is just the URL at which the - * service is responding. - */ - final String serviceURI = req.getRequestURL().toString(); - - final VoID v = new VoID(g, tripleStore, serviceURI, aDataSet); + /* + * There is no such triple/quad store instance (could be a + * concurrent delete of the namespace). + */ - v.describeDataSet(false/* describeStatistics */, - getBigdataRDFContext().getConfig().describeEachNamedGraph); + continue; + } + + final BNode aDataSet = g.getValueFactory().createBNode(); + + /* + * Figure out the service end point. + * + * Note: This is just the requestURL as reported. This makes is + * possible to support virtual hosting and similar http proxy + * patterns since the SPARQL end point is just the URL at which + * the service is responding. + */ + final String serviceURI = req.getRequestURL().toString(); + + final VoID v = new VoID(g, tripleStore, serviceURI, aDataSet); + + v.describeDataSet( + false/* describeStatistics */, + getBigdataRDFContext().getConfig().describeEachNamedGraph); + + } + + sendGraph(req, resp, g); + + } finally { + getBigdataRDFContext().abortTx(tx); } - - sendGraph(req, resp, g); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-03-24 15:41:50 UTC (rev 8015) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/StatusServlet.java 2014-03-26 01:30:59 UTC (rev 8016) @@ -58,7 +58,6 @@ import com.bigdata.journal.AbstractJournal; import com.bigdata.journal.DumpJournal; import com.bigdata.journal.IIndexManager; -import com.bigdata.journal.ITx; import com.bigdata.journal.Journal; import com.bigdata.rdf.sail.sparql.ast.SimpleNode; import com.bigdata.rdf.sail.webapp.BigdataRDFContext.AbstractQueryTask; @@ -97,12 +96,6 @@ .getLogger(StatusServlet.class); /** - * The name of a request parameter used to request metadata about the - * default namespace. - */ - private static final String SHOW_KB_INFO = "showKBInfo"; - - /** * The name of a request parameter used to request a list of the namespaces * which could be served. */ @@ -415,9 +408,6 @@ maxBopLength = 0; } - // Information about the KB (stats, properties). - final boolean showKBInfo = req.getParameter(SHOW_KB_INFO) != null; - // bigdata namespaces known to the index manager. final boolean showNamespaces = req.getParameter(SHOW_NAMESPACES) != null; @@ -542,19 +532,10 @@ } - if (showNamespaces) { - - long timestamp = getTimestamp(req); - - if (timestamp == ITx.READ_COMMITTED) { - - // Use the last commit point. - timestamp = getIndexManager().getLastCommitTime(); - - } - + if (showNamespaces) { + final List<String> namespaces = getBigdataRDFContext() - .getNamespaces(timestamp); + .getNamespaces(getTimestamp(req)); current.node("h3", "Namespaces: "); @@ -564,16 +545,8 @@ } - } + } - if (showKBInfo) { - - // General information on the connected kb. - current.node("pre", getBigdataRDFContext().getKBInfo( - getNamespace(req), getTimestamp(req)).toString()); - - } - /* * Performance counters for the QueryEngine. */ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-24 15:41:55
|
Revision: 8015 http://sourceforge.net/p/bigdata/code/8015 Author: thompsonbry Date: 2014-03-24 15:41:50 +0000 (Mon, 24 Mar 2014) Log Message: ----------- Bug fix to the GRS to protect the index with an UnisolatedReadWriteIndex along all code paths. This is only a partial fix for the reported problem. I am committing this change to CI for feedback before committing through the NSS specific modifications. See #867 (NSS concurrency problem with list namespaces and create namespace) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/sparse/GlobalRowStoreHelper.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/sparse/GlobalRowStoreHelper.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/sparse/GlobalRowStoreHelper.java 2014-03-24 01:34:33 UTC (rev 8014) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/sparse/GlobalRowStoreHelper.java 2014-03-24 15:41:50 UTC (rev 8015) @@ -74,9 +74,17 @@ if (globalRowStore == null) { - IIndex ndx = indexManager.getIndex(GLOBAL_ROW_STORE_INDEX, - ITx.UNISOLATED); - + /** + * The GRS view needs to be protected by an + * UnisolatedReadWriteIndex. + * + * @see <a href="http://trac.bigdata.com/ticket/867"> NSS + * concurrency problem with list namespaces and create + * namespace </a> + */ + IIndex ndx = AbstractRelation.getIndex(indexManager, + GLOBAL_ROW_STORE_INDEX, ITx.UNISOLATED); + if (ndx == null) { if (log.isInfoEnabled()) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-24 01:34:36
|
Revision: 8014 http://sourceforge.net/p/bigdata/code/8014 Author: mrpersonick Date: 2014-03-24 01:34:33 +0000 (Mon, 24 Mar 2014) Log Message: ----------- added support for EmptyAccessPaths Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java 2014-03-23 22:52:45 UTC (rev 8013) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java 2014-03-24 01:34:33 UTC (rev 8014) @@ -24,9 +24,9 @@ */ package com.bigdata.rdf.store; -import com.bigdata.rdf.model.BigdataResource; -import com.bigdata.rdf.model.BigdataURI; -import com.bigdata.rdf.model.BigdataValue; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; /** * A simple class that represents a triple (or quad) pattern. @@ -38,21 +38,21 @@ // private static final long serialVersionUID = 1L; - private final BigdataResource s; - private final BigdataURI p; - private final BigdataValue o; - private final BigdataResource c; + private final Resource s; + private final URI p; + private final Value o; + private final Resource c; - public BigdataTriplePattern(final BigdataResource subject, - final BigdataURI predicate, final BigdataValue object) { + public BigdataTriplePattern(final Resource subject, + final URI predicate, final Value object) { - this(subject, predicate, object, (BigdataResource) null); + this(subject, predicate, object, (Resource) null); } - public BigdataTriplePattern(final BigdataResource subject, - final BigdataURI predicate, final BigdataValue object, - final BigdataResource context) { + public BigdataTriplePattern(final Resource subject, + final URI predicate, final Value object, + final Resource context) { this.s = subject; @@ -64,28 +64,71 @@ } - final public BigdataResource getSubject() { + final public Resource getSubject() { return s; } - final public BigdataURI getPredicate() { + final public URI getPredicate() { return p; } - final public BigdataValue getObject() { + final public Value getObject() { return o; } - final public BigdataResource getContext() { + final public Resource getContext() { return c; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((c == null) ? 0 : c.hashCode()); + result = prime * result + ((o == null) ? 0 : o.hashCode()); + result = prime * result + ((p == null) ? 0 : p.hashCode()); + result = prime * result + ((s == null) ? 0 : s.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BigdataTriplePattern other = (BigdataTriplePattern) obj; + if (c == null) { + if (other.c != null) + return false; + } else if (!c.equals(other.c)) + return false; + if (o == null) { + if (other.o != null) + return false; + } else if (!o.equals(other.o)) + return false; + if (p == null) { + if (other.p != null) + return false; + } else if (!p.equals(other.p)) + return false; + if (s == null) { + if (other.s != null) + return false; + } else if (!s.equals(other.s)) + return false; + return true; + } } Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java 2014-03-23 22:52:45 UTC (rev 8013) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java 2014-03-24 01:34:33 UTC (rev 8014) @@ -24,7 +24,6 @@ */ package com.bigdata.rdf.store; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; @@ -42,6 +41,7 @@ import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPOAccessPath; import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.relation.accesspath.IAccessPath; import com.bigdata.striterator.AbstractChunkedResolverator; import com.bigdata.striterator.IChunkedOrderedIterator; import com.bigdata.util.concurrent.LatchedExecutor; @@ -241,8 +241,9 @@ * database.computeClosureForStatementIdentifiers( * database.getAccessPath(s, p, o, c).iterator()); */ - final SPOAccessPath ap = (SPOAccessPath) state.getAccessPath(stmt.getSubject(), - stmt.getPredicate(), stmt.getObject(), stmt.getContext()); + final IAccessPath<ISPO> ap = (IAccessPath<ISPO>) state.getAccessPath( + stmt.getSubject(), stmt.getPredicate(), + stmt.getObject(), stmt.getContext()); // if(ap.isFullyBoundForKey()) { // /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-23 22:52:48
|
Revision: 8013 http://sourceforge.net/p/bigdata/code/8013 Author: mrpersonick Date: 2014-03-23 22:52:45 +0000 (Sun, 23 Mar 2014) Log Message: ----------- expose a bulk remove(ISPO) method on the sail Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java 2014-03-23 22:45:13 UTC (rev 8012) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java 2014-03-23 22:52:45 UTC (rev 8013) @@ -130,7 +130,7 @@ // // final BigdataStatement[] stmts = // new BigdataStatement[database.getChunkCapacity()]; - final SPO[] stmts = new SPO[database.getChunkCapacity()]; + final ISPO[] stmts = new ISPO[database.getChunkCapacity()]; int i = 0; while ((i = nextChunk(itr, stmts)) > 0) { Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2014-03-23 22:45:13 UTC (rev 8012) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2014-03-23 22:52:45 UTC (rev 8013) @@ -2790,7 +2790,7 @@ /** * Note: The CONTEXT is ignored when in statementIdentifier mode! */ - public synchronized int removeStatements(final ISPO[] stmts, final int numStmts) throws SailException { + public synchronized int removeStatements(final ISPO[] stmts) throws SailException { assertWritableConn(); @@ -2828,8 +2828,7 @@ * visits only the explicit statements. */ final IChunkedOrderedIterator<ISPO> itr = - new ChunkedArrayIterator<ISPO>(numStmts, stmts, - null/* keyOrder */); + new ChunkedArrayIterator<ISPO>(stmts); // The tempStore absorbing retractions. final AbstractTripleStore tempStore = getRetractionBuffer() @@ -2854,18 +2853,17 @@ if (changeLog == null) { - n = database.removeStatements(stmts, numStmts); + n = database.removeStatements(stmts, stmts.length); } else { -// final IChunkedOrderedIterator<ISPO> itr = -// database.computeClosureForStatementIdentifiers( -// new ChunkedArrayIterator<ISPO>(numStmts, stmts, -// null/* keyOrder */)); + final IChunkedOrderedIterator<ISPO> itr = + database.computeClosureForStatementIdentifiers( + new ChunkedArrayIterator<ISPO>(stmts)); // no need to compute closure for sids since we just did it - n = StatementWriter.removeStatements(database, stmts, numStmts, - database.getStatementIdentifiers()/* computeClosureForStatementIdentifiers */, + n = StatementWriter.removeStatements(database, itr, + false/* computeClosureForStatementIdentifiers */, changeLog); // final IAccessPath<ISPO> ap = This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-23 22:45:17
|
Revision: 8012 http://sourceforge.net/p/bigdata/code/8012 Author: mrpersonick Date: 2014-03-23 22:45:13 +0000 (Sun, 23 Mar 2014) Log Message: ----------- expose a bulk remove(ISPO) method on the sail Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java 2014-03-23 17:46:59 UTC (rev 8011) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/changesets/StatementWriter.java 2014-03-23 22:45:13 UTC (rev 8012) @@ -144,7 +144,7 @@ } - private static long removeStatements(final AbstractTripleStore database, + public static long removeStatements(final AbstractTripleStore database, final ISPO[] stmts, final int numStmts, final boolean computeClosureForStatementIdentifiers, Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java 2014-03-23 17:46:59 UTC (rev 8011) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java 2014-03-23 22:45:13 UTC (rev 8012) @@ -80,11 +80,6 @@ */ private transient String hostAddress; -// /** -// * The IPv4 prefix byte. -// */ -// private transient byte prefix; - /** * The cached byte[] key for the encoding of this IV. */ @@ -128,8 +123,6 @@ this.value = value; -// this.prefix = prefix; - } /* @@ -180,8 +173,6 @@ this.value = Inet4Address.textToAddr(s); -// this.prefix = suffix != null ? Byte.valueOf(suffix) : (byte) 33; - } else { throw new IllegalArgumentException("not an IP: " + hostAddress); Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2014-03-23 17:46:59 UTC (rev 8011) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2014-03-23 22:45:13 UTC (rev 8012) @@ -23,7 +23,7 @@ /* Portions of this code are: -Copyright Aduna (http://www.aduna-software.com/) � 2001-2007 +Copyright Aduna (http://www.aduna-software.com/) 2001-2007 All rights reserved. @@ -146,6 +146,7 @@ import com.bigdata.relation.accesspath.IElementFilter; import com.bigdata.service.AbstractFederation; import com.bigdata.service.IBigdataFederation; +import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.CloseableIteratorWrapper; import com.bigdata.striterator.IChunkedIterator; import com.bigdata.striterator.IChunkedOrderedIterator; @@ -2786,6 +2787,125 @@ } + /** + * Note: The CONTEXT is ignored when in statementIdentifier mode! + */ + public synchronized int removeStatements(final ISPO[] stmts, final int numStmts) throws SailException { + + assertWritableConn(); + + flushStatementBuffers(true/* flushAssertBuffer */, false/* flushRetractBuffer */); + + if (m_listeners != null) { + + /* + * FIXME to support the SailConnectionListener we need to + * pre-materialize the explicit statements that are to be + * deleted and then notify the listener for each such explicit + * statement. Since that is a lot of work, make sure that we do + * not generate notices unless there are registered listeners! + */ + + throw new UnsupportedOperationException(); + + } + + // #of explicit statements removed. + long n = 0; + + if (getTruthMaintenance()) { + + /* + * Since we are doing truth maintenance we need to copy the + * matching "explicit" statements into a temporary store rather + * than deleting them directly. This uses the internal API to + * copy the statements to the temporary store without + * materializing them as Sesame Statement objects. + */ + + /* + * Obtain a chunked iterator using the triple pattern that + * visits only the explicit statements. + */ + final IChunkedOrderedIterator<ISPO> itr = + new ChunkedArrayIterator<ISPO>(numStmts, stmts, + null/* keyOrder */); + + // The tempStore absorbing retractions. + final AbstractTripleStore tempStore = getRetractionBuffer() + .getStatementStore(); + + // Copy explicit statements to tempStore. + n = tempStore.addStatements(tempStore, true/* copyOnly */, + itr, null/* filter */); + + /* + * Nothing more happens until the commit or incremental write + * flushes the retraction buffer and runs TM. + */ + + } else { + + /* + * Since we are not doing truth maintenance, just remove the + * statements from the database (synchronous, batch api, not + * buffered). + */ + + if (changeLog == null) { + + n = database.removeStatements(stmts, numStmts); + + } else { + +// final IChunkedOrderedIterator<ISPO> itr = +// database.computeClosureForStatementIdentifiers( +// new ChunkedArrayIterator<ISPO>(numStmts, stmts, +// null/* keyOrder */)); + + // no need to compute closure for sids since we just did it + n = StatementWriter.removeStatements(database, stmts, numStmts, + database.getStatementIdentifiers()/* computeClosureForStatementIdentifiers */, + changeLog); + +// final IAccessPath<ISPO> ap = +// database.getAccessPath(s, p, o, c); +// +// final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); +// +// if (itr.hasNext()) { +// +// final BigdataStatementIteratorImpl itr2 = +// new BigdataStatementIteratorImpl(database, bnodes2, itr) +// .start(database.getExecutorService()); +// +// final BigdataStatement[] stmts = +// new BigdataStatement[database.getChunkCapacity()]; +// +// int i = 0; +// while (i < stmts.length && itr2.hasNext()) { +// stmts[i++] = itr2.next(); +// if (i == stmts.length) { +// // process stmts[] +// n += removeAndNotify(stmts, i); +// i = 0; +// } +// } +// if (i > 0) { +// n += removeAndNotify(stmts, i); +// } +// +// } + + } + + } + + // avoid overflow. + return (int) Math.min(Integer.MAX_VALUE, n); + + } + // private long removeAndNotify(final BigdataStatement[] stmts, final int numStmts) { // // final SPO[] tmp = new SPO[numStmts]; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-23 17:47:02
|
Revision: 8011 http://sourceforge.net/p/bigdata/code/8011 Author: thompsonbry Date: 2014-03-23 17:46:59 +0000 (Sun, 23 Mar 2014) Log Message: ----------- Added AbstractTripleStore::getStatements(IChunkedOrderedIterator<BigdataTriplePattern> triplePatterns) returning BigdataStatementIterator as partial fix to #866 (parallel streaming resolution of triple patterns to ground statements). This commit includes unit tests of the new feature in TestTripleStore. Hand off to MikeP for the rest of that ticket. Modified Paths: -------------- branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java Added Paths: ----------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java Modified: branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java =================================================================== --- branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata/src/java/com/bigdata/striterator/AbstractChunkedResolverator.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -260,7 +260,15 @@ throw new RuntimeException(t); } - assert converted.length == chunk.length; + /** + * Note: This is no longer true. Some conversions can now + * expand or reduce the size of the chunk. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > + * Efficient batch remove of a collection of triple + * patterns </a> + */ +// assert converted.length == chunk.length; // Note: Throws BufferClosedException if closed. buffer.add(converted); Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/AbstractTripleStore.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -165,6 +165,7 @@ import com.bigdata.sparse.GlobalRowStoreUtil; import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.ChunkedConvertingIterator; +import com.bigdata.striterator.ChunkedWrappedIterator; import com.bigdata.striterator.DelegateChunkedIterator; import com.bigdata.striterator.EmptyChunkedIterator; import com.bigdata.striterator.IChunkedIterator; @@ -2706,7 +2707,35 @@ return asStatementIterator(getAccessPath(s, p, o, c).iterator()); } + + /** + * Efficient batched, streaming resolution of triple patterns to statements + * spanned by those triple patterns that are present in the data. + * <p> + * Note: If the input contains triple patterns that have a high cardinality + * in the data, then a large number of statements may be returned. + * + * @param triplePatterns + * A collection of triple patterns or fully bound statements. If + * this collection contains triple patterns that have a high + * cardinality in the data, then a large number of statements may + * be returned. + * + * @return An iterator from which the materialized statements spanned by + * those triple patterns may be read. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ + public BigdataStatementIterator getStatements( + final IChunkedOrderedIterator<BigdataTriplePattern> triplePatterns) { + return asStatementIterator(new ChunkedWrappedIterator<ISPO>( + new BigdataTriplePatternMaterializer(this, triplePatterns) + .start(getExecutorService()))); + + } + final public BigdataValue asValue(final Value value) { return getValueFactory().asValue(value); Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePattern.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -0,0 +1,91 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.store; + +import com.bigdata.rdf.model.BigdataResource; +import com.bigdata.rdf.model.BigdataURI; +import com.bigdata.rdf.model.BigdataValue; + +/** + * A simple class that represents a triple (or quad) pattern. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch remove of + * a collection of triple patterns </a> + */ +public class BigdataTriplePattern { + +// private static final long serialVersionUID = 1L; + + private final BigdataResource s; + private final BigdataURI p; + private final BigdataValue o; + private final BigdataResource c; + + public BigdataTriplePattern(final BigdataResource subject, + final BigdataURI predicate, final BigdataValue object) { + + this(subject, predicate, object, (BigdataResource) null); + + } + + public BigdataTriplePattern(final BigdataResource subject, + final BigdataURI predicate, final BigdataValue object, + final BigdataResource context) { + + this.s = subject; + + this.p = predicate; + + this.o = object; + + this.c = context; + + } + + final public BigdataResource getSubject() { + + return s; + + } + + final public BigdataURI getPredicate() { + + return p; + + } + + final public BigdataValue getObject() { + + return o; + + } + + final public BigdataResource getContext() { + + return c; + + } + +} Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/store/BigdataTriplePatternMaterializer.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -0,0 +1,289 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.store; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.system.SystemUtil; + +import com.bigdata.rdf.spo.ISPO; +import com.bigdata.rdf.spo.SPOAccessPath; +import com.bigdata.relation.accesspath.BlockingBuffer; +import com.bigdata.striterator.AbstractChunkedResolverator; +import com.bigdata.striterator.IChunkedOrderedIterator; +import com.bigdata.util.concurrent.LatchedExecutor; + +import cutthecrap.utils.striterators.ICloseableIterator; + +/** + * Efficient batched, streaming resolution of triple patterns to statements + * spanned by those triple patterns that are present in the data. + * <p> + * Note: If the input contains triple patterns that have a high cardinality + * in the data, then a large number of statements may be returned. + * + * @param triplePatterns + * A collection of triple patterns or fully bound statements. If + * this collection contains triple patterns that have a high + * cardinality in the data, then a large number of statements may + * be returned. + * + * @return An iterator from which the materialized statements spanned by + * those triple patterns may be read. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ +public class BigdataTriplePatternMaterializer + extends + AbstractChunkedResolverator<BigdataTriplePattern, ISPO, AbstractTripleStore> + implements ICloseableIterator<ISPO> { +// implements IChunkedOrderedIterator<ISPO> { + + private final int nthreads; + + public BigdataTriplePatternMaterializer(final AbstractTripleStore db, + final IChunkedOrderedIterator<BigdataTriplePattern> src) { + this(db, src, 4/* nthreads */); + } + + public BigdataTriplePatternMaterializer(final AbstractTripleStore db, + final IChunkedOrderedIterator<BigdataTriplePattern> src, + final int nthreads) { + + super(db, src, new BlockingBuffer<ISPO[]>( + db.getChunkOfChunksCapacity(), + db.getChunkCapacity(), + db.getChunkTimeout(), + TimeUnit.MILLISECONDS)); + + if (nthreads < 0) + throw new IllegalArgumentException(); + + // At least 1 thread. At most ncpus*2. + this.nthreads = Math.max( + Math.min(nthreads, SystemUtil.numProcessors() * 2), 1); + + } + + @Override + public BigdataTriplePatternMaterializer start(final ExecutorService service) { + + helperService.set(new LatchedExecutor(service, nthreads)); + + super.start(service); + + return this; + + } + private final AtomicReference<LatchedExecutor> helperService = new AtomicReference<LatchedExecutor>(); + + @Override + protected ISPO[] resolveChunk(final BigdataTriplePattern[] chunk) { + + final LatchedExecutor helperService = this.helperService.get(); + + if (helperService == null) + throw new IllegalStateException(); + + /* + * The output will be at most sizeof(chunk) arrays. Each array will have + * one or more statements. Any triple patterns that have no intersection + * in the data will be dropped and will not put anything into this + * output queue. + */ + final BlockingQueue<ISPO[]> out = new ArrayBlockingQueue<ISPO[]>( + chunk.length); + + final List<FutureTask<Long>> tasks = new LinkedList<FutureTask<Long>>(); + + try { + + final CountDownLatch latch = new CountDownLatch(chunk.length); + + /* + * Create FutureTasks for each subquery. The futures are not + * submitted to the Executor yet. That happens in call(). By + * deferring the evaluation until call() we gain the ability to + * cancel all subqueries if any subquery fails. + */ + for (BigdataTriplePattern stmt : chunk) { + + /* + * Task runs subquery and cancels all subqueries in [tasks] if + * it fails. + */ + final FutureTask<Long> ft = new FutureTask<Long>( + new ResolveTriplePatternTask(stmt, out)) { + /* + * Hook future to count down the latch when the task is + * done. + */ + public void run() { + try { + super.run(); + } finally { + latch.countDown(); + } + } + }; + + tasks.add(ft); + + } + + /* + * Run triple pattern resolution with limited parallelism. + */ + for (FutureTask<Long> ft : tasks) { + helperService.execute(ft); + } + + /* + * Wait for all tasks to complete. + */ + latch.await(); + + /* + * Check futures, counting the #of solutions. + */ + long nfound = 0L; + for (FutureTask<Long> ft : tasks) { + nfound += ft.get(); + if (nfound > Integer.MAX_VALUE) + throw new UnsupportedOperationException(); + } + + /* + * Convert into a single ISPO[] chunk. + */ + final ISPO[] dest = new ISPO[(int) nfound]; + int destPos = 0; + ISPO[] src = null; + while ((src = out.poll()) != null) { + System.arraycopy(src/* src */, 0/* srcPos */, dest, destPos, + src.length); + destPos += src.length; + } + + return dest; + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } finally { + + // Cancel any tasks which are still running. + for (FutureTask<Long> ft : tasks) + ft.cancel(true/* mayInterruptIfRunning */); + + } + + } + + /** + * Resolve a triple pattern to the statements that it spans in the data. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + private class ResolveTriplePatternTask implements Callable<Long> { + + private final BigdataTriplePattern stmt; + private final BlockingQueue<ISPO[]> out; + + public ResolveTriplePatternTask(final BigdataTriplePattern stmt, + final BlockingQueue<ISPO[]> out) { + this.stmt = stmt; + this.out = out; + } + + @Override + public Long call() throws Exception { + /* + * TODO What about closure over the SIDs? + * + * final IChunkedOrderedIterator<ISPO> itr = + * database.computeClosureForStatementIdentifiers( + * database.getAccessPath(s, p, o, c).iterator()); + */ + final SPOAccessPath ap = (SPOAccessPath) state.getAccessPath(stmt.getSubject(), + stmt.getPredicate(), stmt.getObject(), stmt.getContext()); + +// if(ap.isFullyBoundForKey()) { +// /* +// * Optimize when triple pattern is a fully bound statement. +// * In this case, the output is either that statement (with IVs +// * resolved) or the triple pattern is dropped. +// */ +// final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); +// try { +// if (!itr.hasNext()) +// return 0L; +// final ISPO spo = itr.next(); +// out.add(new ISPO[]{spo}); +// return 1L; +// } finally { +// itr.close(); +// } +// } else { + long n = 0L; + final IChunkedOrderedIterator<ISPO> itr = ap.iterator(); + try { + while (itr.hasNext()) { + final ISPO[] a = itr.nextChunk(); +// if (true) { +// // verify no null array elements. +// for (int i = 0; i < a.length; i++) { +// if (a[i] == null) +// throw new AssertionError(Arrays.toString(a)); +// } +// } + out.add(a); + n += a.length; + } + return n; + } finally { + itr.close(); + } +// } + + } + + } + +} Modified: branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java =================================================================== --- branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java 2014-03-23 16:10:08 UTC (rev 8010) +++ branches/RDR/bigdata-rdf/src/test/com/bigdata/rdf/store/TestTripleStore.java 2014-03-23 17:46:59 UTC (rev 8011) @@ -46,11 +46,14 @@ import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.RDFS; import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.parser.sparql.FOAF; import com.bigdata.rdf.axioms.NoAxioms; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.lexicon.DumpLexicon; +import com.bigdata.rdf.lexicon.Id2TermWriteProc; import com.bigdata.rdf.lexicon.LexiconRelation; +import com.bigdata.rdf.lexicon.Term2IdWriteProc; import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataLiteral; import com.bigdata.rdf.model.BigdataURI; @@ -1453,6 +1456,137 @@ } + /** + * Unit test of the batched parallel resolution of triple patterns. + * + * @see <a href="http://trac.bigdata.com/ticket/866" > Efficient batch + * remove of a collection of triple patterns </a> + */ + public void test_getStatementsUsingTriplePatterns() { + + final Properties properties = super.getProperties(); + + // override the default axiom model. + properties.setProperty( + com.bigdata.rdf.store.AbstractTripleStore.Options.AXIOMS_CLASS, + NoAxioms.class.getName()); + + final AbstractTripleStore store = getStore(properties); + + try { + + // verify nothing in the store. + assertSameIterator(new Statement[] {}, store.getAccessPath(NULL, + NULL, NULL).iterator()); + + final BigdataValueFactory f = store.getValueFactory(); + + final BigdataURI A = f.createURI("http://www.bigdata.com/A"); + final BigdataURI B = f.createURI("http://www.bigdata.com/B"); + final BigdataURI C = f.createURI("http://www.bigdata.com/C"); + final BigdataURI Person = f.asValue(FOAF.PERSON); + final BigdataURI rdfType = f.asValue(RDF.TYPE); + final BigdataURI foafKnows = f.asValue(FOAF.KNOWS); + + { + + final IStatementBuffer<Statement> buffer = new StatementBuffer<Statement>( + store, 100); + + buffer.add(A, rdfType, Person); + buffer.add(B, rdfType, Person); + buffer.add(C, rdfType, Person); + + buffer.add(A, foafKnows, B); + buffer.add(B, foafKnows, A); + buffer.add(B, foafKnows, C); + buffer.add(C, foafKnows, B); + + buffer.flush(); + + } + + // Empty input. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {}; + assertSameStatements( + new Statement[] {// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Single pattern matching one statement. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, rdfType, null),// + }; + assertSameStatements( + new Statement[] {// + new StatementImpl(A, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Single pattern matching three statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(null, rdfType, Person),// + }; + assertSameStatements( + new Statement[] {// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Two patterns matching various statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, foafKnows, null),// + new BigdataTriplePattern(null, rdfType, Person),// + }; + assertSameIteratorAnyOrder( + new Statement[] {// + new StatementImpl(A, foafKnows, B),// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + // Three patterns, two of which match various statements. + { + final BigdataTriplePattern[] triplePatterns = new BigdataTriplePattern[] {// + new BigdataTriplePattern(A, foafKnows, null),// + new BigdataTriplePattern(null, rdfType, Person),// + new BigdataTriplePattern(rdfType, foafKnows, null),// no match + }; + assertSameIteratorAnyOrder( + new Statement[] {// + new StatementImpl(A, foafKnows, B),// + new StatementImpl(A, rdfType, Person),// + new StatementImpl(B, rdfType, Person),// + new StatementImpl(C, rdfType, Person),// + },// + store.getStatements(new ChunkedArrayIterator<BigdataTriplePattern>( + triplePatterns))); + } + + } finally { + + store.__tearDownUnitTest(); + + } + + } + private String getVeryLargeURI() { final int len = 1024000; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-23 16:10:12
|
Revision: 8010 http://sourceforge.net/p/bigdata/code/8010 Author: mrpersonick Date: 2014-03-23 16:10:08 +0000 (Sun, 23 Mar 2014) Log Message: ----------- trying again with the IPv4 prefix support Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java 2014-03-23 15:13:12 UTC (rev 8009) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java 2014-03-23 16:10:08 UTC (rev 8010) @@ -1,7 +1,12 @@ package com.bigdata.rdf.internal; import java.math.BigInteger; +import java.util.Arrays; +import org.apache.log4j.Logger; + +import com.bigdata.btree.BytesUtil.UnsignedByteArrayComparator; + /** * This class represents an Internet Protocol version 4 (IPv4) address. * Defined by <a href="http://www.ietf.org/rfc/rfc790.txt"> @@ -59,21 +64,22 @@ public final class Inet4Address { - - final long address; +// private static final Logger log = Logger.getLogger(Inet4Address.class); + + final byte[] address; public Inet4Address(final byte addr[]) { - if (addr == null || addr.length != 5) - throw new IllegalArgumentException(); - - long address = addr[4] & 0xFFl; - address |= ((addr[3] << 8) & 0xFF00l); - address |= ((addr[2] << 16) & 0xFF0000l); - address |= ((addr[1] << 24) & 0xFF000000l); - address |= ((addr[0] << 32) & 0xFF00000000l); +// if (addr == null || addr.length != 5) +// throw new IllegalArgumentException(); +// +// long address = addr[4] & 0xFFl; +// address |= ((addr[3] << 8) & 0xFF00l); +// address |= ((addr[2] << 16) & 0xFF0000l); +// address |= ((addr[1] << 24) & 0xFF000000l); +// address |= (((long)(addr[0] << 32)) & 0xFF00000000l); - this.address = address; + this.address = addr; } @@ -86,15 +92,17 @@ */ public byte[] getBytes() { - byte[] addr = new byte[5]; - addr[0] = (byte) ((address >>> 32) & 0xFF); - addr[1] = (byte) ((address >>> 24) & 0xFF); - addr[2] = (byte) ((address >>> 16) & 0xFF); - addr[3] = (byte) ((address >>> 8) & 0xFF); - addr[4] = (byte) (address & 0xFF); +// byte[] addr = new byte[5]; +// addr[0] = (byte) ((address >>> 32) & 0xFF); +// addr[1] = (byte) ((address >>> 24) & 0xFF); +// addr[2] = (byte) ((address >>> 16) & 0xFF); +// addr[3] = (byte) ((address >>> 8) & 0xFF); +// addr[4] = (byte) (address & 0xFF); +// +// return addr; + + return address; - return addr; - } /** @@ -107,36 +115,27 @@ return numericToTextFormat(getBytes()); } - /** - * Returns a hashcode for this IP address. - * - * @return a hash code value for this IP address. - */ - public int hashCode() { - return BigInteger.valueOf(address).hashCode(); - } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(address); + return result; + } - /** - * Compares this object against the specified object. - * The result is <code>true</code> if and only if the argument is - * not <code>null</code> and it represents the same IP address as - * this object. - * <p> - * Two instances of <code>InetAddress</code> represent the same IP - * address if the length of the byte arrays returned by - * <code>getAddress</code> is the same for both, and each of the - * array components is the same for the byte arrays. - * - * @param obj the object to compare against. - * @return <code>true</code> if the objects are the same; - * <code>false</code> otherwise. - * @see java.net.InetAddress#getAddress() - */ - public boolean equals(Object obj) { - return (obj != null) && (obj instanceof Inet4Address) && - (((Inet4Address)obj).address == address); - } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Inet4Address other = (Inet4Address) obj; + return UnsignedByteArrayComparator.INSTANCE.compare(address, other.address) == 0; + } + // Utilities /* * Converts IPv4 binary address into a string suitable for presentation. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-23 15:13:16
|
Revision: 8009 http://sourceforge.net/p/bigdata/code/8009 Author: mrpersonick Date: 2014-03-23 15:13:12 +0000 (Sun, 23 Mar 2014) Log Message: ----------- trying again with the IPv4 prefix support Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java Added Paths: ----------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java 2014-03-23 03:28:20 UTC (rev 8008) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java 2014-03-23 15:13:12 UTC (rev 8009) @@ -544,14 +544,10 @@ * as an IPAddrIV. We need to fix the Extension mechanism for URIs. * Extension is already used above. */ - try { - final byte[] addr = new byte[4]; - System.arraycopy(key, o, addr, 0, 4); - final InetAddress ip = InetAddress.getByAddress(addr); - return new IPAddrIV(ip);//, key[o+4]); - } catch (UnknownHostException ex) { - throw new RuntimeException(ex); - } + final byte[] addr = new byte[5]; + System.arraycopy(key, o, addr, 0, 5); + final Inet4Address ip = new Inet4Address(addr); + return new IPAddrIV(ip); } case XSDByte: { final byte x = key[o];//KeyBuilder.decodeByte(key[o]); Added: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java (rev 0) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java 2014-03-23 15:13:12 UTC (rev 8009) @@ -0,0 +1,259 @@ +package com.bigdata.rdf.internal; + +import java.math.BigInteger; + +/** + * This class represents an Internet Protocol version 4 (IPv4) address. + * Defined by <a href="http://www.ietf.org/rfc/rfc790.txt"> + * <i>RFC 790: Assigned Numbers</i></a>, + * <a href="http://www.ietf.org/rfc/rfc1918.txt"> + * <i>RFC 1918: Address Allocation for Private Internets</i></a>, + * and <a href="http://www.ietf.org/rfc/rfc2365.txt"><i>RFC 2365: + * Administratively Scoped IP Multicast</i></a> + * + * <h4> <A NAME="format">Textual representation of IP addresses</a> </h4> + * + * Textual representation of IPv4 address used as input to methods + * takes one of the following forms: + * + * <blockquote><table cellpadding=0 cellspacing=0 summary="layout"> + * <tr><td><tt>d.d.d.d</tt></td></tr> + * <tr><td><tt>d.d.d</tt></td></tr> + * <tr><td><tt>d.d</tt></td></tr> + * <tr><td><tt>d</tt></td></tr> + * </table></blockquote> + * + * <p> When four parts are specified, each is interpreted as a byte of + * data and assigned, from left to right, to the four bytes of an IPv4 + * address. + + * <p> When a three part address is specified, the last part is + * interpreted as a 16-bit quantity and placed in the right most two + * bytes of the network address. This makes the three part address + * format convenient for specifying Class B net- work addresses as + * 128.net.host. + * + * <p> When a two part address is supplied, the last part is + * interpreted as a 24-bit quantity and placed in the right most three + * bytes of the network address. This makes the two part address + * format convenient for specifying Class A network addresses as + * net.host. + * + * <p> When only one part is given, the value is stored directly in + * the network address without any byte rearrangement. + * + * <p> For methods that return a textual representation as output + * value, the first form, i.e. a dotted-quad string, is used. + * + * <h4> The Scope of a Multicast Address </h4> + * + * Historically the IPv4 TTL field in the IP header has doubled as a + * multicast scope field: a TTL of 0 means node-local, 1 means + * link-local, up through 32 means site-local, up through 64 means + * region-local, up through 128 means continent-local, and up through + * 255 are global. However, the administrative scoping is preferred. + * Please refer to <a href="http://www.ietf.org/rfc/rfc2365.txt"> + * <i>RFC 2365: Administratively Scoped IP Multicast</i></a> + * @since 1.4 + */ + +public final class Inet4Address { + + + final long address; + + public Inet4Address(final byte addr[]) { + + if (addr == null || addr.length != 5) + throw new IllegalArgumentException(); + + long address = addr[4] & 0xFFl; + address |= ((addr[3] << 8) & 0xFF00l); + address |= ((addr[2] << 16) & 0xFF0000l); + address |= ((addr[1] << 24) & 0xFF000000l); + address |= ((addr[0] << 32) & 0xFF00000000l); + + this.address = address; + + } + + /** + * Returns the raw IP address of this <code>InetAddress</code> + * object. The result is in network byte order: the highest order + * byte of the address is in <code>getAddress()[0]</code>. + * + * @return the raw IP address of this object. + */ + public byte[] getBytes() { + + byte[] addr = new byte[5]; + addr[0] = (byte) ((address >>> 32) & 0xFF); + addr[1] = (byte) ((address >>> 24) & 0xFF); + addr[2] = (byte) ((address >>> 16) & 0xFF); + addr[3] = (byte) ((address >>> 8) & 0xFF); + addr[4] = (byte) (address & 0xFF); + + return addr; + + } + + /** + * Returns the IP address string in textual presentation form. + * + * @return the raw IP address in a string format. + * @since JDK1.0.2 + */ + public String toString() { + return numericToTextFormat(getBytes()); + } + + /** + * Returns a hashcode for this IP address. + * + * @return a hash code value for this IP address. + */ + public int hashCode() { + return BigInteger.valueOf(address).hashCode(); + } + + /** + * Compares this object against the specified object. + * The result is <code>true</code> if and only if the argument is + * not <code>null</code> and it represents the same IP address as + * this object. + * <p> + * Two instances of <code>InetAddress</code> represent the same IP + * address if the length of the byte arrays returned by + * <code>getAddress</code> is the same for both, and each of the + * array components is the same for the byte arrays. + * + * @param obj the object to compare against. + * @return <code>true</code> if the objects are the same; + * <code>false</code> otherwise. + * @see java.net.InetAddress#getAddress() + */ + public boolean equals(Object obj) { + return (obj != null) && (obj instanceof Inet4Address) && + (((Inet4Address)obj).address == address); + } + + // Utilities + /* + * Converts IPv4 binary address into a string suitable for presentation. + * + * @param src a byte array representing an IPv4 numeric address + * @return a String representing the IPv4 address in + * textual representation format + * @since 1.4 + */ + + static String numericToTextFormat(byte[] src) + { + final int netmask = src[4] & 0xff; + + return (src[0] & 0xff) + "." + (src[1] & 0xff) + "." + + (src[2] & 0xff) + "." + (src[3] & 0xff) + + (netmask < 32 ? "/" + netmask : ""); + } + + public static Inet4Address textToAddr(String... s) { + + byte[] res = new byte[5]; + + long val; + try { + switch (s.length) { + case 1: + /* + * When only one part is given, the value is stored directly in + * the network address without any byte rearrangement. + */ + + val = Long.parseLong(s[0]); + if (val < 0 || val > 0xffffffffL) + return null; + res[0] = (byte) ((val >> 24) & 0xff); + res[1] = (byte) (((val & 0xffffff) >> 16) & 0xff); + res[2] = (byte) (((val & 0xffff) >> 8) & 0xff); + res[3] = (byte) (val & 0xff); + res[4] = (byte) (33 & 0xff); + break; + case 2: + /* + * When a two part address is supplied, the last part is + * interpreted as a 24-bit quantity and placed in the right most + * three bytes of the network address. This makes the two part + * address format convenient for specifying Class A network + * addresses as net.host. + */ + + val = Integer.parseInt(s[0]); + if (val < 0 || val > 0xff) + return null; + res[0] = (byte) (val & 0xff); + val = Integer.parseInt(s[1]); + if (val < 0 || val > 0xffffff) + return null; + res[1] = (byte) ((val >> 16) & 0xff); + res[2] = (byte) (((val & 0xffff) >> 8) & 0xff); + res[3] = (byte) (val & 0xff); + res[4] = (byte) (33 & 0xff); + break; + case 3: + /* + * When a three part address is specified, the last part is + * interpreted as a 16-bit quantity and placed in the right most + * two bytes of the network address. This makes the three part + * address format convenient for specifying Class B net- work + * addresses as 128.net.host. + */ + for (int i = 0; i < 2; i++) { + val = Integer.parseInt(s[i]); + if (val < 0 || val > 0xff) + return null; + res[i] = (byte) (val & 0xff); + } + val = Integer.parseInt(s[2]); + if (val < 0 || val > 0xffff) + return null; + res[2] = (byte) ((val >> 8) & 0xff); + res[3] = (byte) (val & 0xff); + res[4] = (byte) (33 & 0xff); + break; + case 4: + /* + * When four parts are specified, each is interpreted as a byte + * of data and assigned, from left to right, to the four bytes + * of an IPv4 address. + */ + for (int i = 0; i < 4; i++) { + val = Integer.parseInt(s[i]); + if (val < 0 || val > 0xff) + return null; + res[i] = (byte) (val & 0xff); + } + res[4] = (byte) (33 & 0xff); + break; + case 5: + /* + * When five parts are specified, each is interpreted as a byte + * of data and assigned, from left to right, to the four bytes + * of an IPv4 address plus the one byte for the netmask. + */ + for (int i = 0; i < 5; i++) { + val = Integer.parseInt(s[i]); + if (val < 0 || val > 0xff) + return null; + res[i] = (byte) (val & 0xff); + } + break; + default: + return null; + } + } catch (NumberFormatException e) { + return null; + } + return new Inet4Address(res); + } + +} Property changes on: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/Inet4Address.java ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +text/plain \ No newline at end of property Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java 2014-03-23 03:28:20 UTC (rev 8008) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/impl/uri/IPAddrIV.java 2014-03-23 15:13:12 UTC (rev 8009) @@ -29,7 +29,6 @@ import java.io.ObjectOutput; import java.io.ObjectStreamException; import java.io.Serializable; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,19 +38,14 @@ import com.bigdata.btree.BytesUtil.UnsignedByteArrayComparator; import com.bigdata.btree.keys.IKeyBuilder; -import com.bigdata.btree.keys.KeyBuilder; import com.bigdata.io.LongPacker; import com.bigdata.rdf.internal.DTE; -import com.bigdata.rdf.internal.ILexiconConfiguration; import com.bigdata.rdf.internal.IV; -import com.bigdata.rdf.internal.IVUtility; +import com.bigdata.rdf.internal.Inet4Address; import com.bigdata.rdf.internal.VTE; import com.bigdata.rdf.internal.impl.AbstractInlineIV; import com.bigdata.rdf.lexicon.LexiconRelation; -import com.bigdata.rdf.model.BigdataBNode; import com.bigdata.rdf.model.BigdataURI; -import com.bigdata.rdf.model.BigdataValueFactory; -import com.bigdata.rdf.spo.SPOKeyOrder; /** * Internal value representing an inline IP address. Uses the InetAddress @@ -62,7 +56,7 @@ * <p> * {@inheritDoc} */ -public class IPAddrIV<V extends BigdataURI> extends AbstractInlineIV<V, InetAddress> +public class IPAddrIV<V extends BigdataURI> extends AbstractInlineIV<V, Inet4Address> implements Serializable, URI { /** @@ -79,7 +73,7 @@ /** * The inline IP address. */ - private final InetAddress value; + private final Inet4Address value; /** * The cached string representation of this IP. @@ -101,7 +95,7 @@ */ private transient V uri; - public IV<V, InetAddress> clone(final boolean clearCache) { + public IV<V, Inet4Address> clone(final boolean clearCache) { final IPAddrIV<V> tmp = new IPAddrIV<V>(value);//, prefix); @@ -124,7 +118,7 @@ /** * Ctor with internal value specified. */ - public IPAddrIV(final InetAddress value) {//, final byte prefix) { + public IPAddrIV(final Inet4Address value) {//, final byte prefix) { /* * TODO Using XSDBoolean so that we can know how to decode this thing @@ -167,12 +161,25 @@ // log.debug(ip); - this.value = InetAddress.getByName(ip); + final String suffix = matcher.group(4); -// final String suffix = matcher.group(4); - // log.debug(suffix); + + final String[] s; + if (suffix != null) { + + s = new String[5]; + System.arraycopy(ip.split("\\.", -1), 0, s, 0, 4); + s[4] = suffix; + + } else { + + s = ip.split("\\.", -1); + + } + this.value = Inet4Address.textToAddr(s); + // this.prefix = suffix != null ? Byte.valueOf(suffix) : (byte) 33; } else { @@ -188,7 +195,7 @@ /** * Returns the inline value. */ - public InetAddress getInlineValue() throws UnsupportedOperationException { + public Inet4Address getInlineValue() throws UnsupportedOperationException { return value; } @@ -246,13 +253,7 @@ @Override public String getLocalName() { if (hostAddress == null) { - -// if (prefix < 33) { -// hostAddress = value.getHostAddress() + "/" + prefix; -// } else { - hostAddress = value.getHostAddress(); -// } - + hostAddress = value.toString(); } return hostAddress; } @@ -264,9 +265,8 @@ if (this == o) return true; if (o instanceof IPAddrIV) { - final InetAddress value2 = ((IPAddrIV<?>) o).value; -// final byte prefix2 = ((IPAddrIV<?>) o).prefix; - return value.equals(value2);// && prefix == prefix2; + final Inet4Address value2 = ((IPAddrIV<?>) o).value; + return value.equals(value2); } return false; } @@ -303,17 +303,7 @@ private byte[] key() { if (key == null) { - -// final IKeyBuilder kb = KeyBuilder.newInstance(); -// -// kb.append(value.getAddress()); -// -// kb.append(prefix); -// -// key = kb.getKey(); - - key = value.getAddress(); - + key = value.getBytes(); } return key; @@ -358,19 +348,7 @@ } private Object readResolve() throws ObjectStreamException { - - try { - - final InetAddress value = InetAddress.getByAddress(key); - - return new IPAddrIV(value); - - } catch (UnknownHostException ex) { - - throw new RuntimeException(ex); - - } - + return new Inet4Address(key); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-23 03:28:26
|
Revision: 8008 http://sourceforge.net/p/bigdata/code/8008 Author: mrpersonick Date: 2014-03-23 03:28:20 +0000 (Sun, 23 Mar 2014) Log Message: ----------- rolling back support for IPv4 prefixes Modified Paths: -------------- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java Modified: branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java =================================================================== --- branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java 2014-03-23 03:26:21 UTC (rev 8007) +++ branches/RDR/bigdata-rdf/src/java/com/bigdata/rdf/internal/IVUtility.java 2014-03-23 03:28:20 UTC (rev 8008) @@ -548,7 +548,7 @@ final byte[] addr = new byte[4]; System.arraycopy(key, o, addr, 0, 4); final InetAddress ip = InetAddress.getByAddress(addr); - return new IPAddrIV(ip, key[o+4]); + return new IPAddrIV(ip);//, key[o+4]); } catch (UnknownHostException ex) { throw new RuntimeException(ex); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |